UBC Theses and Dissertations

UBC Theses Logo

UBC Theses and Dissertations

Distributed linear programming with Apache Spark Mohyedin Kermani, Ehsan 2016

Your browser doesn't seem to have a PDF viewer, please download the PDF to view this item.

Item Metadata

Download

Media
24-ubc_2017_february_mohyedinkermani_ehsan.pdf [ 845.89kB ]
Metadata
JSON: 24-1.0340337.json
JSON-LD: 24-1.0340337-ld.json
RDF/XML (Pretty): 24-1.0340337-rdf.xml
RDF/JSON: 24-1.0340337-rdf.json
Turtle: 24-1.0340337-turtle.txt
N-Triples: 24-1.0340337-rdf-ntriples.txt
Original Record: 24-1.0340337-source.json
Full Text
24-1.0340337-fulltext.txt
Citation
24-1.0340337.ris

Full Text

Distributed linear programming with Apache SparkbyEhsan Mohyedin KermaniB.Sc. Sharif University of Technology, 2011M.Sc. University of British Columbia, 2013A THESIS SUBMITTED IN PARTIAL FULFILLMENTOF THE REQUIREMENTS FOR THE DEGREE OFMaster of ScienceinTHE FACULTY OF GRADUATE AND POSTDOCTORALSTUDIES(Computer Science)The University of British Columbia(Vancouver)December 2016c© Ehsan Mohyedin Kermani, 2016AbstractFor this thesis project, we have implemented Mehrotra’s predictor-corrector interiorpoint algorithm on top of Apache Spark for solving large-scale linear programmingproblems. Our large-scale solver (Spark-LP) is unique because it is open-source,fault-tolerant and can be used on commodity cluster of machines. As a result,Spark-LP provides an opportunity to solve large-scale problems at the lowest pos-sible cost. We have assessed the performance and convergent results of our solveron self-generated, sparse and dense large-scale problems over small to medium-sized clusters, composed of 16 to 64 Amazon’s Elastic Computing Cloud r3.xlargeinstances. In conclusions, we have made important suggestions for breaking thecurrent structural limitations so that our solver can be used on heterogeneous clus-ters containing CPUs and GPUs on JVM environment without the usual numericallimitations and overheads.iiPrefaceAll the research ideas and methods explained in this thesis are the results of fruitfuldiscussions between Professor Uri Ascher, Professor Chen Greif and Ehsan Mo-hyedin Kermani. The software design and obtaining the results were executedby Ehsan Mohyedin Kermani. The manuscript preparations were conducted byEhsan Mohyedin Kermani with invaluable guidance from Uri Ascher and ChenGreif throughout this process.iiiTable of contentsAbstract . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . iiPreface . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . iiiTable of contents . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . ivList of tables . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . viList of figures . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . viiAcknowledgments . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . viii1 Introduction . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 12 Linear programming and primal dual interior point methods . . . . . 52.1 Linear programming . . . . . . . . . . . . . . . . . . . . . . . . . 52.1.1 Duality theorems and Farkas lemma . . . . . . . . . . . . . 62.2 Primal-dual interior point methods . . . . . . . . . . . . . . . . . . 92.2.1 A practical algorithm . . . . . . . . . . . . . . . . . . . . . 112.2.2 Starting point . . . . . . . . . . . . . . . . . . . . . . . . . 122.2.3 Mehrotra’s predictor-corrector algorithm . . . . . . . . . . . 143 MapReduce, Apache Spark and distributed matrix computations . . . 163.1 MapReduce . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 163.1.1 Programming model . . . . . . . . . . . . . . . . . . . . . 173.1.2 Execution model . . . . . . . . . . . . . . . . . . . . . . . 17iv3.1.3 Fault-tolerance . . . . . . . . . . . . . . . . . . . . . . . . 183.1.4 Advantages and disadvantages . . . . . . . . . . . . . . . . 193.2 Apache Spark as generalized MapReduce . . . . . . . . . . . . . . 203.2.1 Resilient distributed datasets . . . . . . . . . . . . . . . . . 203.2.2 Representation of RDDs and Spark execution model . . . . 213.2.3 Cluster mode . . . . . . . . . . . . . . . . . . . . . . . . . 233.2.4 Overview of Spark memory management . . . . . . . . . . 233.2.5 Advantages and disadvantages . . . . . . . . . . . . . . . . 253.3 Distributed linear algebra and optimization in Apache Spark . . . . 263.3.1 Efficient matrix computations . . . . . . . . . . . . . . . . 274 Distributed linear programming with Apache Spark . . . . . . . . . . 284.1 Methodology and software architecture design . . . . . . . . . . . . 284.1.1 Local versus distributed design . . . . . . . . . . . . . . . . 294.1.2 Unified interface . . . . . . . . . . . . . . . . . . . . . . . 364.2 Benchmark results . . . . . . . . . . . . . . . . . . . . . . . . . . . 364.2.1 Local mode . . . . . . . . . . . . . . . . . . . . . . . . . . 364.2.2 Cluster mode . . . . . . . . . . . . . . . . . . . . . . . . . 415 Conclusions . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 46Bibliography . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 48vList of tablesTable 4.1 Local result . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 38Table 4.2 Distributed results: n?,m? are logarithm values of n,m respec-tively, δ is the sparsity density, p is the number of partitions . . . 42viList of figuresFigure 3.1 [28, Left to right, DAG splits into stages and tasks] . . . . . . . 22Figure 3.2 [2, Cluster overview] . . . . . . . . . . . . . . . . . . . . . . . 23Figure 3.3 [25, Spark 1.6+ memory manamgent] . . . . . . . . . . . . . . 25Figure 4.1 [7, spark-1-1-mllib-performance-improvements] . . . . . . . . . 31Figure 4.2 [7, spark-1-1-mllib-performance-improvements] . . . . . . . . . 35Figure 4.3 Spark-UI example 1 . . . . . . . . . . . . . . . . . . . . . . . . 40Figure 4.4 Spark-UI example 2 . . . . . . . . . . . . . . . . . . . . . . . . 40Figure 4.5 Distributed performance results . . . . . . . . . . . . . . . . . . 43Figure 4.6 Average iteration time . . . . . . . . . . . . . . . . . . . . . . . 44Figure 4.7 Distributed convergent results . . . . . . . . . . . . . . . . . . 45viiAcknowledgmentsFirst and foremost, I would like to thank my supervisors Professor Uri Ascher andProfessor Chen Greif for all their help, trust and financial support throughout mystudy at the University of British Columbia (UBC). I am indebted to them for theinvaluable opportunity they gave me to join the scientific computing lab of the com-puter science department at UBC. I am grateful to Professor Mark Greenstreet forhis time in evaluating and second reading my thesis.I appreciate Apache Spark community friendly software development environ-ment and would like to thank Databricks company for granting me their academiccluster resources for finalizing the results.I would also like to express wholeheartedly, my sincere appreciation to my verysupportive family, my mother “Mehrzad”, my father “Hamid” and my sister “Af-soon” for all their love and endless encouragements throughout my life.As last but not least, my deepest appreciation goes to my beloved wife “Shadi”for her continuous love and immense support.viiiChapter 1IntroductionThe volume of sheer data that is being generated world wide, is increasing at agreater velocity than ever before. From Physics particle discovery data to socialmedia data, stock market data and so on. Data comes in different varieties, shapesand formats. The three mentioned characteristics have coined the word Big Data inindustrial side. Nonetheless, the emergence of Big Data in computational science,in particular high performance computing, dates back to a few decades before theindustrial recognition for large-scale numerical applications.In 2004, Google introduced the MapReduce framework [23] to analyze Big Datain order to extract knowledge and insight using large-scale (commodity) clusters ofmachines in a distributed way. The main features of MapReduce are handling ma-chine failure systematically (fault-tolerance), and its minimal programming frame-work where users need to express the computations in terms of two functions: mapand reduce. This simplistic model allows users to concentrate on their applica-tions by hiding the harder parts through various abstractions. However, MapReduceis insufficient when it comes to fast iterative applications because of the followingtwo main reasons:• MapReduce does not leverage data persistence well enough.• There is a significant I/O bottleneck in each iteration.On the other hand, in high performance computing where objectives are merelyhigh accuracy and fast execution of computations, the current solution for analyz-1ing Big Data is the computational and programming framework known as messagepassing interface (MPI). MPI supports a variety of functionalities with intricate de-tails that users must explicitly control, resulting into high accuracy and fast execu-tion of computations [15]. However, MPI is not a feasible option from the industrialpoint of view dealing with Big Data, as the needed infrastructure is very expensiveand hard to maintain [26].In 2010, Apache Spark generalized MapReduce by introducing its distributedmemory abstraction, called Resilient Distributed Datasets (RDD) [28]. ApacheSpark lies between MapReduce and MPI and has closed the gap between industrialand high performance computing perspectives. In fact, Spark solved the aforemen-tioned issues with MapReduce by• Leveraging different levels of data persistence in memory so that iterativeapplications can run much faster.• Removing the storage requirement for intermediate results.Apache Spark as the distributed computing platform for Big Data applications,has provided distributed linear algebra and convex optimization supports throughone simple assumption that divides vector and matrix operations: vectors are localand matrices must be distributed across the cluster. That is, since matrices arequadratically taking more storage space than vectors, then vector operations shouldbe kept local while matrix operations should be distributed across the cluster [30].For example, matrix-matrix operations are done over the cluster and matrix-localvector operations can be efficiently done by broadcasting the local vector to workerscontaining the matrix partitions.Our main motivation is to assess Spark for large-scale numerical applicationswhich are closer to high performance computing and to analyze its advantages anddisadvantages. For this reason, we have chosen linear programming which is oneof the most fundamental and well-known optimization techniques in scientific com-puting area.2A linear programming in standard form is described asmin cT xsubject to Ax = bx≥ 0where c,x are in Rn, b is in Rm, A is an m×n real-valued matrix.We have implemented Mehrotra’s predictor-corrector interior point algorithm,described in Section 2.2.3, to solve large-scale linear programming problems on topof Apache Spark in [20, Spark-LP]. Briefly, when a user specifies c,b as distributedvectors (RDD[DenseVector]) and AT as distributed matrix (RDD[Vector])by determining the same number of partitions on their underlying data, the Sparkdiver program is connected to the cluster via a (shared) SparkContext instanceas described in Section 3.2.3. Then, the resource manager allocates Spark workers(executors), distributes c,b and A among workers, and the scheduler schedules tasksto run in executors in parallel. Note that, as Mehrotra’s algorithm iteratively updatesx, the computations in each iteration happen in multiple parallel steps dependingon the number of partitions and the total number of available cores in the cluster.Additionally, Spark tries to manage most of the computations locally within eachworker and limits the number of communications across workers.To leverage hardware accelerations and fast local computations, we have usedBasic Linear Algebra Subprograms (BLAS) and Linear Algebra Package (LA-PACK) native routines written in Fortran, through the Java Native Interface (JNI)provided by Netlib-java [17].Our solver [20] is open-source, fault-tolerant and can be used on commodityclusters of machines for solving large-scale linear programming problems. As aresult, it provides an opportunity to solve large-scale problems at the lowest possiblecost.Note that there are a number of software packages for solving large-scale lin-ear programming problems but typically they are not fault-tolerant, and thereforethey require access to expensive infinite bandwidth clusters. As such, they are notstraightforward to use in Big Data and large-scale setting on commodity clusters.In addition, the majority of large-scale solvers are proprietary and very expensive3to use.For testing/tuning purposes and describing the inevitable overheads that Sparkbrings to the table, compared to non fault-tolerant solvers with low level controlssuch as GLPK [10], we have included some of the local results from solving smallsized Netlib [18] problems in Section 4.2.1. We have generated large-scale prob-lems in order to assess the performance of our solver on large-scale problems oversmall to medium clusters. The results are provided and analyzed in Section 4.2.2.Finally in Chapter 5, we have made suggestions for improving the current struc-tural limitations namely how to use a heterogeneous cluster consisting of CPUs andGPUs by using RDD[INDArray] provided in [16, ND4J] library.4Chapter 2Linear programming and primaldual interior point methodsIn this chapter, first we briefly introduce some of the basic properties of linearprogramming. Then, we end this chapter with detailed description of Mehrohra’spredictor-corrector interior point algorithm given in Section 2.2.3.2.1 Linear programmingLinear programming (LP) is a special case of mathematical optimization whereboth the objective and constraints are described by linear equations. Formally, itsstandard form is written asmin cT xsubject to Ax = bx≥ 0(2.1)where c,x are in Rn, b is in Rm, A is an m×n real-valued matrix and (·)T is thetranspose operation. Throughout, we will assume rank(A) = m≤ n.Associated to the above LP problem in primal form, is the dual problem as5followsmax bTλsubject to ATλ + s = cs≥ 0(2.2)where λ ∈ Rm and s ∈ Rn.Historically [33], Dantzig introduced the first algorithm, called thes simplexmethod to efficiently tackle most LP cases in 1940s. Later in 1979, Khachiyanshowed that LP problems are solvable in polynomial time. In 1984, Karmaker intro-duced a new class of methods known as interior-point methods which led to the dis-covery of primal-dual algorithms. Among such algorithms, Mehrohra’s predictor-corrector interior point method has had significant impact on computational sideand software aspect of LP solvers since 1990. This method is the main subject ofthis chapter.2.1.1 Duality theorems and Farkas lemmaFirst, we introduce the following definitionsDefinition 2.1.1. LetFP = {x |Ax= b,x≥ 0} be the set of primal feasible solutionsof (2.1). Then, (2.1)a) is infeasible, ifFP = /0b) is unbounded, if there is a sequence (xk)n≥1 of elements of FP such thatlimk→∞ cT xk =−∞Definition 2.1.2. A primal optimal solution x? of (2.1) is a primal feasible solutionsuch that cT x? ≤ cT x for all x ∈FP.Similarly, we have the following definitions for the dual problem (2.2)Definition 2.1.3. LetFD = {(λ ,s) | ATλ + s = b,s≥ 0} be the set of dual feasiblesolutions of (2.2). Then, (2.2)a) is infeasible, ifFD = /06b) is unbounded, if there is a sequence (λ k,sk)n≥1 of elements of FD such thatlimk→∞ bTλ k =+∞Definition 2.1.4. A dual optimal solution (λ ?,s?) of (2.2) is a dual feasible solutionsuch that bTλ ? ≥ bTλ for all (λ ,s) ∈FD.Therefore, the following results bridge the gap between the solutions of primaland dual LP problemsTheorem 2.1.5 (Weak Duality [32]). Let x ∈ FP and (λ ,s) ∈ FD. Then, cT x ≥bTλ , where equality holds if and only if xT (c−ATλ ) = 0, or equivalently,xi(c−ATλ )i = 0 for all 1≤ i≤ nProof. We have Ax = b, then bTλ = (Ax)Tλ = xT ATλ . Therefore,cT x−bTλ = cT x− xT ATλ = xT c− xT ATλ = xT (c−ATλ ) = xT s≥ 0.The condition xi(c−ATλ )i = xisi = 0 for all 1 ≤ i ≤ n, is known as the com-plementarity condition and µ = cT x−bTλ is called the duality gap for the solutionx ∈FP of (2.1) and λ ∈FD of (2.2).The following consequences of the Weak Duality Theorem are immediateCorollary 2.1.6. a) If (2.1) is unbounded then (2.2) is infeasible.b) If (2.2) is unbounded then (2.1) is infeasible.c) If there is a x? ∈FP and an λ ? ∈FD such that cT x? = bTλ ?, then x? is anoptimal solution to (2.1) and λ ? is an optimal solution to (2.2).The corollary Theorem 2.1.6 establishes sufficient conditions for the infeasibil-ity of (2.1) and (2.2) as well as the optimality of a pair of primal and dual solutions.However, infeasibility of one of the problems does not imply unboundedness ofthe other and the true reverse statement can be obtained by the following theorems,known as Farkas Lemma7Theorem 2.1.7 (Primal Farkas Lemma [24]). Exactly one of the two systems has afeasible solutiona) Ax = b, x≥ 0b) ATλ ≤ 0, bTλ > 0Theorem 2.1.8 (Dual Farkas Lemma). Exactly one of the two systems has a feasiblesolutiona) cT x< 0, Ax = 0, x≥ 0b) ATλ ≤ cWe can use the Farkas Lemma to prove the followingCorollary 2.1.9. a) If (2.1) is infeasible, then (2.2) is either infeasible or un-bounded.b) If (2.2) is infeasible, then (2.1) is either infeasible or unbounded.Proof. We prove a) and the proof of b) will be similar. If (2.1) is infeasible, thenthere is no x ∈ Rn such thatAx = b, x≥ 0.Then, by Primal Farkas Lemma Theorem 2.1.7, there exists a λ ∈Rm, such thatATλ ≤ 0, bTλ > 0.If (2.2) is infeasible, then we are done. But if (2.2) is feasible, then there existsλ¯ ∈ Rm such that AT λ¯ ≤ c and for any α > 0, we haveAT (λ¯ +αλ )≤ cresulting intolimα→+∞bT (αλ + λ¯ ) = +∞.8Hence, (2.2) is unbounded.The above theorem establishes the reverse statements of corollary Theorem 2.1.6parts a) and b). The reverse of part c) is highly non-trivial. We know that ifboth (2.1), (2.2) have feasible solutions, by Weak Duality, the optimal values arebounded. However, it is not clear if both primal and dual problems have optimalsolutions, whether the optimal objective values of primal and dual are equal or not.That is, in fact, reflected in the following theoremTheorem 2.1.10 (Strong Duality [22]). If one of the primal or dual problems hasa finite optimal value, then so does the other and the optimal objective values areequal.That is, cT x? = bTλ ?, where x? is a primal optimal solution and λ ? is a dualoptimal solution.2.2 Primal-dual interior point methodsThe solution of primal (2.1) and dual (2.2) problems (x,λ ,s) are characterized bythe Karush-Kuhn-Tucker conditionsATλ + s = cAx = bx≥ 0s≥ 0xisi = 0, 1≤ i≤ nPrimal-dual methods find solutions (x?,λ ?,s?) of the above system, by applyingNewton’s method and modifying search directions and step lengths so that we havethe strict inequality xT s> 0 in every iteration.To obtain primal-dual interior point methods, we can write the above equationsin a single mapping as follows9F(x,λ ,s) =ATλ + s− cAx−bXSe= 0, xT s≥ 0where X = diag(x1, · · · ,xn), S = diag(s1, · · · ,sn) and e is the vector of all 1s.To compute the Newton search directions, we need to solveJ(x,λ ,s)∆x∆λ∆s=−F(x,λ ,s)whereJ(x,λ ,s) =0 AT IA 0 0S 0 Xis the Jacobian of F.In order to preserve the condition xT s > 0, we can do line search along theNewton direction and define the updates as(x,λ ,s)+α(∆x,∆λ ,∆s)for some α ∈ (0,1].In most primal-dual methods, the goal is milder than directly solving the aboveequations. It is about taking a Newton step towards a point which xisi = σµ, whereµ =n∑i=1xisin=xT snis the current duality measure and σ ∈ [0,1] is the reduction factor in the dualitymeasure (also known as the centering parameter).Thus, the simplified step direction is computed by solving100 AT IA 0 0S 0 X∆x∆λ∆s= −rc−rb−XSe+σµe (2.3)where rb = Ax−b and rc = ATλ + s−c. Therefore, here is the pseudo-code forthe primal-dual interior point methods algorithm discussed aboveprocedure PRIMAL-DUAL INTERIOR POINT METHODS(c,A,b) [27, page 396]Given (x0,λ 0,s0) where (x0)T s0 > 0for k = 0,1,2, · · · doChoose σk ∈ [0,1] and solve 0 AT IA 0 0Sk 0 Xk∆xk∆λ k∆sk= −rkc−rkb−XkSke+σkµkewhere µk =(xk)T skn.Set (xk+1,λ k+1,sk+1) = (xk,λ k,sk)+αk(∆xk,∆λ k,∆sk)for some αk such that (xk+1)T sk+1 > 0.2.2.1 A practical algorithmNewton’s method forms a linear model (linearization) around the current point andsolves Equation 2.3 to obtain a search direction, this is known as affine scalingdirection. This linearization has an error in modeling the equation xisi = 0 for1≤ i≤ n. A key point in practical algorithms is their use of corrector step [27].That is, if we consider the affine scaling direction obtained from Equation 2.30 AT IA 0 0S 0 X∆xaff∆λ aff∆saff= −rc−rb−XSe+σµe (2.4)then, taking a full step in this direction will lead us to11(xi+∆xaffi )(si+∆saffi ) = xisi+ xi∆saffi + si∆xaffi +∆xaffi ∆saffi = ∆xaffi ∆saffiHowever, that shows that the updated xisi is ∆xaffi ∆saffi not the desired value of0. To correct that, we can solve the following system0 AT IA 0 0S 0 X∆xcor∆λ cor∆scor= 00−XaffSaffe (2.5)Therefore, we need to combine the two solutions from Equation 2.4 and Equa-tion 2.5 to obtain(∆xaff,∆λ aff,∆saff)+(∆xcor,∆λ cor,∆scor).In most practical cases, the above update reduces the duality gap more than pureaffine scaling update.2.2.2 Starting pointThe choice of an starting point (x0,λ 0,s0) is crucial in making the algorithm robustto failure in convergence. The following is a heuristic that finds a starting pointsatisfying the equality constraints in primal and dual problems reasonably well,while ensuring the positivity of x and s. We first solve [27, page 410]minx12xT x subject to Ax = bmin(λ ,s)12sT s subject to ATλ + s = cTheir solutions are x˜ = AT (AAT )−1b, λ˜ = (AAT )−1Ac, s˜ = c−AT λ˜ .We need to adjust the solutions so that they have positive components. There-fore, letδx = max{−1.5minix˜i,0}, δs = max{−1.5minis˜i,0},12and we update x˜, s˜ as followsxˆ = x˜+δxe, sˆ = s˜+δse,where e = (1,1, · · · ,1)T .To ensure the starting points are not too close to zero, we defineδ˜x =12xˆT sˆeT sˆ, δ˜s =12xˆT sˆeT xˆ.Thus, the starting points arex0 = xˆ+ ˆδxeλ 0 = λ˜s0 = sˆ+ δˆse(2.6)Simplifying the equations Equation 2.4 and Equation 2.5 lead us to the follow-ing Mehrotra’s predictor-corrector algorithm.132.2.3 Mehrotra’s predictor-corrector algorithmThe following is the Mehrotra’s predictor-corrector algorithm that we will use laterprocedure SOLVE(c, A, b, tol, maxIter)Set x0,λ0,s0← init(c, A, b) from 2.6.Set iter← 1.while not converged and iter≤maxIter doSet rb← Ax−b, rc← ATλ + s− c, and D← S−1/2X1/2.Solve using Cholesky decomposition:AD2AT∆λ aff =−rb+AT (−D2rc+ x).Set ∆saff←−rc−AT∆λ aff, and ∆xaff←−x−D2∆saff.SetαPriaff ←min{1, mini:∆xaffi <0− xi∆xaffi},αDualaff ←min{1, mini:∆saffi <0− si∆saffi}.Set σ ← (µaff/µ)3 where µ is the duality gap sT x/n.Solve using Cholesky decomposition:AD2AT∆λ =−rb+AD2(−rc+ s+X−1∆Xaff∆Saffe−σµX−1e).Update ∆s←−rc−AT∆λ and∆x←−D2∆s− x−S−1∆Xaff∆Saffe+σµS−1e.Choose ηiter ∈ [0.9,1) and computeαPriiter←min{1,ηiterαPriiter},αDualiter ←min{1,ηiterαDualiter }.Update x← x+αPriiter∆x, and (λ ,s)← (λ ,s)+αDualiter (∆λ ,∆s).return (x,λ ,s)14and the convergence condition is [33, page 226]‖rb‖1+‖b‖ < tol and‖rc‖1+‖c‖ < tol and|cT x−bTλ |1+ |cT x| < tolwith usual tol = 10−8, where ‖.‖ is the Euclidean norm.15Chapter 3MapReduce, Apache Spark anddistributed matrix computationsIn this chapter, we introduce MapReduce framework for analyzing Big Data anddescribe some of its advantages and disadvantages. The framework is introducedin Section 3.1. Next, we explain the generalized MapReduce framework and itsimplementation in Apache Spark project in Section 3.2. Finally, in Section 3.3 wedescribe how efficient distributed linear algebra support is made possible in ApacheSpark.3.1 MapReduceIn 2004, Google introduced a new computational and programming framework foranalyzing Big Data called MapReduce. In short, users of MapReduce frameworkspecify the computation in terms of a map and a reduce function, andthe underlying runtime system automatically parallelizes the computa-tion across large-scale clusters of machines, handles machine failures(fault-tolerant) and schedules inter-machine communication to makeefficient use of the network and disk [23].In the rest of this section, we explain the programming model (Section 3.1.1)and the execution model (Section 3.1.2). The advantages and disadvantages ofMapReduce particularly for iterative applications are discussed in Section 3.1.4.163.1.1 Programming modelThe major advantage of MapReduce is in hiding intricate but tedious details such asparallelization, fault-tolerance and inter-machine communication from users. Con-sequently, it allows users to merely think about their applications by expressingcomputations in terms of two functions: map and reduce.This simplistic feature might seem very restrictive at first, but it actually is cru-cial in solving many problems involving Big Data [23]. We will talk about itsadvantages and disadvantages later in Section 3.1.4.The map function accepts a set of pairs of key/value as input and produces an-other set of key/value intermediate pairs. Then, under the hood, the MapReducelibrary is responsible for grouping all intermediate values corresponding to a samekey and passes them to the reduce function.The reduce function accepts an intermediate key with its set of correspondingvalues and outputs a possibly smaller set of values. It is worth noting that theintermediate results are handed to the reduce via an iterator to allow handlingsets of values that might not fit in memory buffer.3.1.2 Execution modelWhen a MapReduce application is ready for execution, the following steps willoccur [23]1. One machine in our cluster is designated as master and others are consideredas workers. The input files are divided into M pieces (typically each piece inthe range of 16−64 MB). Then copies (forks) of the program are sent via thenetwork to all workers.2. For each MapReduce job, there are M map tasks and R reduce tasks. Themaster picks idle workers and assigns a map task and a reduce task to each ofthem. The master also maintains a set of states (idle, in-progress, completed)for every map and reduce task in a special data structure.3. A worker with a map task reads an input piece of data from containing key/-value pairs and feeds them to the user-defined map function. The output (in-17termediate key/value pairs) is stored in memory buffer of the worker. Then,the output is written to local disk, partitioned into R parts by a partitioningfunction. Moreover, the locations of these buffered pairs in local memory aresent to the master which is responsible for distributing these locations to idleworkers for the reduce phase.4. When a reduce worker is notified by the master about the mentioned loca-tions, it uses the Remote Procedure Call (RPC), which in this particular caseis a special protocol used in distributed computing for accessing data in aworker from another worker as if the other worker had the data already storedin its local disk, to read the buffered data stored in local disk. Once this isdone, the reduce worker groups together all occurrences of the same key bysorting the intermediate keys. Note that in sorting, if the intermediate datadoes not fit in memory, an external sort is then used.5. Once the sorting is done, the reduce worker iterates over the intermediatedata and applies the user-defined reduce function to a unique key and its cor-responding values. The output of the reduce function is appended to a finaloutput file for this reduce partition.6. The MapReduce job is completed when there is no more in-progress map orreduce tasks left.3.1.3 Fault-toleranceMapReduce has been designed to work with Big Data on large-scale clusters ofmachines. Therefore, it naturally must handle machine failures. This special mech-anism is called fault-tolerant.Fault-tolerant is guaranteed at data layer as well as application layer. That is, atdata layer multiple replicas (usually 2−3 times per piece) of data are stored in theunderlying global distributed file system and at application layer the mechanism isdescribed as follows:Periodically, the master pings each worker and waits to get a response back incertain amount of time. If the master does not get back a response from a worker, it18marks that worker as failed. Then depending on the worker state, the master behavesdifferently. In that, any in-progress map or reduce task on a failed worker is resetto idle and becomes eligible for rescheduling on other workers. And any completedmap tasks on a failed worker are reset back to initial idle state and becomes eligiblefor rescheduling, because the output of a failed worker is unreachable to others fromits local disk.Furthermore, when a map task is executed by a worker A and it fails, then an-other worker B is executing the map task, all other workers executing reduce tasksare notified by the master for this change, and any reduce task that has not alreadyread the data from A, will read the data from B instead.3.1.4 Advantages and disadvantagesSome important advantages of MapReduce are listed below:• Simplicity of the framework: allowing users to only think about the appli-cation by abstracting away the harder parts such as fault-tolerance and inter-machine communications.• Locality of computations: the MapReduce master has the location informa-tion of the input data and attempts to schedule a map task on machine thatcontains a replica of the corresponding data. If the master fails to do so, ittries to schedule a map task near a replica. This lowers the amount of networkcommunications significantly and speeds up the computations.• Fault-tolerance: as described in Section 3.1.3F or the purpose of this thesis, the disadvantages of MapReduce are as follows• Very limited programming model: simplicity of the framework is a double-edged sword. Despite offering great power to tackle Big Data problems, thereare still problems that become very hard to program and process with MapRe-duce.19• I/O bottleneck in iterative applications: due to how MapReduce was de-signed, for iterative applications that are the subject of this thesis, MapRe-duce is not suitable. Since in each iteration, data has to be read from and thenwritten back to the underlying distributed file system, therefore, this intro-duces more bottlenecks to iterative applications and significantly reduces thespeed.• MapReduce does not leverage data persistence enough. In particular, for iter-ative applications it slows the computations further down.In the next section, we will see how it is possible to solve the mentioned short-comings for iterative applications by generalizing the framework.3.2 Apache Spark as generalized MapReduceAs previously described, traditional MapReduce brings some overheads to itera-tive computations. In order to remove the overheads, such as relying on multiplereads and writes to the underlying stable distributed file system, a new abstraction,named Resilient Distributed Datasets (RDD) has been introduced and implementedin Apache Spark project using Scala [19] on Java Virtual Machine (JVM) environ-ment.To understand why RDD abstraction is powerful, we first need to explain moreabout RDD, its representation and the operations involved.3.2.1 Resilient distributed datasetsIn short, RDD is an immutable (read-only), distributed collection of objects. RDDscan be created either from a data in stable storage or from other RDDs through spe-cial operations. There are two types of operations acting of RDDs: transformationsand actions (consider transformations as the generalized notion of map in MapRe-duce and actions as the generalized notion of reduce). More precisely, transforma-tions are lazy operations that can be chained together and actions are operations thatlaunch computations and return values.20Exploiting the lazy feature of transformations, RDDs need not to be material-ized all the time throughout an application. Moreover, an RDD “knows” how itwas created from previous RDDs by tracking its lineage. This is so powerful thatit not only solves I/O bottleneck in iterative applications, but also it enables furtheroptimizations that is explained in [31]. Note that unlike in MapReduce, no inter-mediate results need to be stored multiple times to ensure fault-tolerance. That is,a lost partition of an RDD can be automatically recomputed by following back itslineage. In essence, a program cannot reference an RDD that it cannot reconstructafter a failure [28].Lastly, RDDs have two important customizable features: persistence and par-titioning. That is, RDDs can be explicitly cached/persisted in memory or otherstorage strategies (including serialized in memory or disk, etc.) for efficient reuse.Additionally, RDD’s elements can be partitioned across machines based on a key ineach record to enable further placement optimization.3.2.2 Representation of RDDs and Spark execution modelFormally, an RDD is characterized by the following five items [28], [31]1. A set of partitions, which are atomic pieces of underlying dataset.2. A set of dependencies on parent RDDs.3. A function for computing the dataset based on its parents.4. Optionally, a partitioning function.5. Optionally, a list of preferred locations for data placement.To use Spark, an application developer write a driver program that connects toa cluster of workers through a SparkContext instance. Note that Spark code onthe driver keeps track of the RDDs’ lineage under the hood.Any Spark’s application has its own Directed Acyclic Graph (DAG) consistingof RDDs as vertices and transformations/actions as edges. When Spark driver wants21to evaluate an action, it divides the DAG into stages at DAG-Scheduler and more im-portantly, optimization happens through pipelining the transformations (lazy eval-uation of transformations plays a crucial role here). Then, each stage is scheduledfurther into tasks at the Task-Scheduler where actual computations start to happen.It is necessary to mention that when driver wants to evaluate a transformationsuch as map on an RDD, it passes closures (function literals) to the underlying data.In Scala, each closure is a Java object, therefore, it is serialized and sent to anotherworker node in the cluster. Then it will be loaded and deserialized, ready to operateon underlying data.The following illustrates the split of DAG into stages and tasks. Moreover, itshows how several tasks with narrow dependencies are optimized via pipeliningthem into a single task.Figure 3.1: [28, Left to right, DAG splits into stages and tasks]223.2.3 Cluster modeSpark driver is the process where the main method runs. As described earlier, aftersplitting application DAG into stages then into tasks, the driver schedules tasksfor executions. In detail, the Spark driver is first connected to the cluster viaSparkContext object. More precisely, SparkContext can connect to sev-eral types of cluster managers responsible for resource allocations and coordinatingtasks. Second, once connected, Spark acquires worker nodes’ executors. Executorsare workers’ processes which are responsible for executing individual tasks andstoring data in a given Spark job. Finally, the driver sends the serialized applicationcode to executors to run tasks. It is worth mentioning that, if any worker dies orruns slowly, its tasks will be sent to different executors to be processed again.Figure 3.2: [2, Cluster overview]3.2.4 Overview of Spark memory managementUnderstanding Spark memory management is critical for getting the best resultsfrom an application. We start by specifying that any Spark process that works on acluster or local machine is a JVM process. The user can configure JVM heap sizeby passing flag options. Spark divides the JVM heap into several parts described asfollows• Reversed memory: Spark sets aside 300 MB of memory which will be un-23touchable and in fact, it sets the limit on what the user can allocate for Sparkusage.• Spark memory: This memory pool is directly managed by Spark. It can bemodified by the option spark.memory.fraction. This part consists oftwo smaller parts:– Storage memory: This is used for both storing cached data and for tem-porary space serialized data. spark.memory.storageFractionis controlling by the storage memory with a typical value of 50% or60% of Spark memory.– Execution memory: This is used for storing objects required for execu-tion of Spark tasks. When not enough memory is available, it supportsspilling to disk. Its size is what is left in spark.memory.fraction.The size of Spark memory can be calculated as(Java heap−Reserved memory)×spark.memory.fraction.• User memory: This memory pool remains after the allocation of Spark mem-ory and it is left to the user to use it appropriately. Its size is calculated as(Java heap−Reserved memory)× (1−spark.memory.fraction).The following illustration summarizes the above explanations in Figure 3.324Figure 3.3: [25, Spark 1.6+ memory manamgent]3.2.5 Advantages and disadvantagesSpark with its distributed memory abstraction (RDD), provides a limited program-ming interface due to its immutable nature and coarse-grained transformations. Butstill the creators of Spark have made use of the abstraction for a wide class ofapplications and made many improvements as of Spark-2.0.0 by adding more fine-grained abstractions which are beyond the scope of our brief descriptions.It is somewhat surprising that the RDDs have high expressible power over pre-viously defined abstractions. In fact, RDDs can be used to reconstruct the numberof cluster programming models with ease [28]. Major applications that have beenintegrated in Spark are: SQL, MLlib (machine learning library), GraphX (graphprocessing library) and Streaming (batched stream processing library).Other advantages of Spark are efficient use of data sharing abstraction, avoiding25the cost of data replication and leveraging data persistence well enough.However, some disadvantages of Spark are inevitable high latency (althoughlower than MapReduce), in particular for applications involving stream of data.The other is for applications that make asynchronous fine-grained updates to sharedstate, such as an incremental web crawler [28]. From the high performance com-puting side, the main disadvantage is the speed. Because the mentioned advantagesfor Big Data applications weigh much higher than only the speed comparing to, forexample, MPI [26]. For the record, these are some of the main reasons of slowdown in speed: serialization, deserialization, communication costs, bandwidth inthe cluster, stragglers (slow workers) and JVM itself.3.3 Distributed linear algebra and optimization inApache SparkApache Spark as the most well-known open-source, distributed computing platformfor Big Data applications, has provided distributed linear algebra and convex op-timization supports through one simple assumption that divides vector and matrixoperations: vectors are local and matrices must be distributed across the cluster.That is, since matrices are quadratically taking more storage space than vectors,then vector operations should be local operations while matrix operations should bedistributed and involve the cluster [30]. For example, matrix-matrix operations aredone over the cluster and matrix-local vector operations can be efficiently done bybroadcasting the local vector to workers containing matrix pieces.Based on this assumption, there are built-in specially designed distributed linearalgebra module (linalg) and a distributed convex optimization module (optimiza-tion) for separable convex objective functions as part of MLlib (Spark’s machinelearning library) [30], [3].There have been three main distributed matrix data structures implemented inSpark with sparse and dense supports• CoordinatMatrix: which distributes an RDD of type MatrixEntrywith underlying data (i: Long, j: Long, value: Double)indicating specific position of a value in matrix. It is suitable for very sparse26matrix storage and operations.• BlockMatrix: which distributes an RDD of blocks(i: Int, j: Int, mat: Matrix)indicating specific locations of each block in a structured block formated ma-trix.• RowMatrix: which distributes an RDD of local row vectors. It is suitablefor talk-skinny or small-wide matrices and operations involving them.In addition to distributed matrix data structures, the local vector and matrix sup-ports are provided by the Scala library called Breeze [5]. Breeze supports efficientcomputations for sparse or dense local vectors or matrices.3.3.1 Efficient matrix computationsDepending on the underlying data structure for distributed matrices, there are differ-ent types of implementations for special matrix operations such as Singular ValueDecomposition (SVD), QR decomposition [30] and Gramian matrix computationusing DIMSUM [29] in Spark.For efficiency of computations, it is vital to fully utilize the hardware-specificlinear algebraic operations locally on a single node. To do that, Spark uses BLAS(Basic Linear Algebra Subroutines) interface with native Fortran speed. Native li-braries can be used in Scala through Java Native Interface (JNI) and interoperabilityof Scala with Java. Netlib-java [17] library provides the JNI and it is wrapped byBreeze library. The variety of BLAS implementations have been tested for linalgmodule and the results were reported in [30].In addition to native BLAS support, the native Linear Algebra Package (LA-PACK) and Arnoldi Package (ARPACK) written in Fortran are available throughNetlib-java. Optimized routines from LAPACK and ARPACK have been widelyutilized in linalg and optimization modules in Spark. For more details, see [3].27Chapter 4Distributed linear programming withApache SparkIn this chapter, we describe the logic behind our large-scale linear programmingsolver (Spark-LP) implemented in [20]. We explain in depth our software architec-ture design and present some of its advantages over other possible choices. Finally,we demonstrate performance results obtained from solving small-scale to large-scale problems in Section 4.2.4.1 Methodology and software architecture designAs described at the beginning of Chapter 2, the linear programming problem instandard form is given by the Equation 2.1 and for the sake of convenience wereproduce here:min cT xsubject to Ax = bx≥ 0We assume that the real valued constraint matrix A is of dimension m× n, wherem≤ n and rank(A) = m.Our goal is to implement Mehrotra’s predictor-corrector algorithm, described28in Section 2.2.3, to solve large-scale linear programming problems in a distributedway. To do so, because most of the operations involved in Section 2.2.3 are vectorspace operations, then the first step is to separate vector space interface and vectorspace operator interface. These abstractions help us implement operations involvinglocal vectors or matrices separately from operations involving distributed vectorsor matrices.4.1.1 Local versus distributed designVectorsAs mentioned in Section 3.3.1, in order to utilize hardware acceleration for localcomputations, we can use Netlib-java [17] which provides a Native Java Interface(JNI) to directly call BLAS [4] and LAPACK [14] routines written in Fortran. Thisfunctionality is already provided in Spark. Therefore, as an example, if a,b aretwo DenseVectors, we can define their local inner product aT b or their linearcombinations αa+βb with coefficients α,β as follows1 /∗ l o c a l c o m p u t a t i o n s : ∗ /2 i m p o r t o rg . apache . s p a r k . m l l i b . l i n a l g .{BLAS, DenseVec tor }34 d e f d o t ( a : DenseVector , b : DenseVec tor ) : Double = BLAS . d o t ( a , b )56 d e f combine ( a l p h a : Double ,7 a : DenseVector ,8 b e t a : Double ,9 b : DenseVec tor ) : DenseVec tor = {10 v a l r e t = a . copy11 BLAS . s c a l ( a lpha , r e t )12 BLAS . axpy ( be t a , b , r e t )13 r e t14 }Listing 4.1: Excerpts from [20]As we go from the built-in local vector and matrix data structures in Spark todistributed matrix data structures, the assumption of [30] that all vectors are local29but matrices are distributed (as explained in Section 3.3) is no longer sufficientfor our purpose. That is, since the objective vector x and its coefficient c do notnecessarily fit in a single machine, we need to define our own distributed vectordata structure.We notice that there are two viable choices to represent distributed vectors:RDD[Double] and RDD[DenseVector]. The latter is our choice as it offerssome important advantages over the former (also indicated in [21]), most impor-tantly is the computation speed.Because partitions of an RDD[DenseVector] consist of some number oflocal DenseVectors, operations involving RDD[DenseVector] can be exe-cuted much faster than the normal choice RDD[Double] by exploiting BLASoperations locally. For example, following up our previous local example, the in-ner product of two RDD[DenseVector] a,b with equal number of partitions andtheir linear combinations can be computed by using the previously defined dot andcombine over partitions and then aggregating the results. The aggregation can bedone via the aggregate function as follows (for simplicity we use DVector astype alias for RDD[DenseVector])1 /∗ d i s t r i b u t e d c o m p u t a t i o n s : ∗ /2 d e f d o t ( a : DVector , b : DVector ) : Double = {3 a . z i p ( b ) . a g g r e g a t e ( 0 . 0 ) (4 seqOp = ( acc : Double , ab : ( DenseVector , DenseVec tor ) ) => {5 acc + DenseVec to rSpace . d o t ( ab . 1 , ab . 2 )6 } ,7 combOp = ( acc1 : Double , acc2 : Double ) => acc1 + acc28 )9 }1011 d e f combine ( a l p h a : Double , a : DVector , b e t a : Double , b : DVector ) :DVector =12 i f ( a l p h a == 1 . 0 && b e t a == 1 . 0 ) {13 a . z i p ( b ) . map {14 c a s e ( a P a r t , b P a r t ) => {15 BLAS . axpy ( 1 . 0 , a P a r t , b P a r t ) / / b P a r t += a P a r t16 b P a r t17 }3018 }19 } e l s e {20 a . z i p ( b ) . map {21 c a s e ( a P a r t , b P a r t ) =>22 DenseVec to rSpace . combine ( a lpha , a P a r t , be t a , b P a r t ) .toDense23 }24 }Listing 4.2: Excerpts from [20]Spark’s great versatility lets us easily go from local to distributed computa-tions as shown above. Since the aggregate efficiency is critical for us, as an op-timization to our aggregate function we can use treeAggregate functionwith pre-specified depth to compute the aggregated results more efficiently thanaggregate as depicted below inFigure 4.1: [7, spark-1-1-mllib-performance-improvements]The main difference is that treeAggregate need not wait for all the resultsto be computed and sent back to driver. Instead it aggregates the partial results whenready, up to the final result. Therefore, the optimized distributed inner product is asfollows311 d e f d o t ( a : DVector , b : DVector , d e p t h : I n t = 2 ) : Double =2 a . z i p ( b ) . t r e e A g g r e g a t e ( 0 . 0 ) ( ( sum , x ) => sum + BLAS . d o t ( x . 1 , x. 2 ) , + , d e p t h )Listing 4.3: Excerpts from [20]MatricesIn all interesting linear programming problems that the number of unknowns ex-ceeds the number of equations, we notice if we assume that the number of rows inA of Equation 2.1 is small, then columns of A can be considered as local (sparse ordense) vectors. Thus, we can choose RDD[Vector] (with type alias DMatrix)as our distributed matrix data type. However, we can leverage RowMatrix inSection 3.3 with underlying distributed data type to our advantage and insteadof working with A, we can consider its transpose B = AT and exploit the imple-mented functionalities greatly. For our purpose, we have modified RowMatrix toLPRowMatrix in [20] to compute the Gramian matrix with less overhead for ourcomputations. The Gramian matrix computation such as BT B = AAT for initializa-tion, described in Section 2.2.2 and BT D2B= (DB)T (DB) in Mehrotra’s algorithm,described in Section 2.2.3, is critical for us. Based on our earlier assumption, the re-sult of BT B is a local symmetric positive definite matrix, therefore, we can computeits inverse with two LAPACK routines [14] described below as follows1. dpptrf: computes the Cholesky decomposition of a symmetric positive def-inite matrix in packed format2. dpptri: computes the inverse of a symmetric positive definite matrix fromits Cholesky decomposition computed by dpptrf321 i m p o r t com . g i t h u b . fommil . n e t l i b .LAPACK. { g e t I n s t a n c e => l a p a c k }2 i m p o r t o rg . n e t l i b . u t i l . intW34 d e f posSymDefInv (B : Array [ Double ] , m: I n t ) : Un i t = {5 v a l i n f o 1 = new intW ( 0 )6 l a p a c k . d p p t r f ( ”U” , m, B , i n f o 1 )7 v a l code1 = i n f o 1 . ‘ va l ‘8 a s s e r t ( code1 == 0 , s ” l a p a c k . r e t u r n e d $code1 . ” )9 v a l i n f o 2 = new intW ( 0 )10 l a p a c k . d p p t r i ( ”U” , m, B , i n f o 2 )11 v a l code2 = i n f o 2 . ‘ va l ‘12 a s s e r t ( code2 == 0 , s ” l a p a c k . r e t u r n e d $code2 . ” )13 }Listing 4.4: Excerpts from [20]Moreover, to solve(BT D2B)∆λ =−rb+BT D2(−rc+ s+X−1∆Xaff∆Saffe−σµX−1e)in Section 2.2.3 via Cholesky decomposition, we have used the LAPACK dppsvroutine designed for solving symmetric positive definite linear system of equationsin-place. Fortunately, this functionality exists in Spark linalg module.Note that there are other possible choices for distributed matrix data structures,such as BlockMatrix, as introduced in Section 3.3, which is only suitable if apriori, we know that B is composed of block matrices. That is the reason we havedecided to use the more general data structure.Matrix-matrix productAs mentioned above, the only distributed matrix-matrix product that we care aboutis the Gramian matrix computation in both the initialization Section 2.2.2 and themain algorithm Section 2.2.3. Since we represent B as an RDD[Vector] whererows are local vectors, we need to be careful about the maximum size of one-dimensional Vector to avoid overflow in our computation. Unfortunately, theunderlying array of numbers in Scala/Java is limited to 32 bits in length. As we willexplain more, that results into having the maximum number of 65535 columns in B33(rows in A). A priori, there is no limitations for the number of rows in B (columnsin A) and it all depends on the number of available memory in our cluster.The Gramian matrix is computed via the BLAS routine spr which adds theouter product vvT of a local vector row v in each partition inplace locally and theresults are aggregated from the executors with treeAggregate. Formally,BT B =n∑i=1vivTiwhere vi’s are local row vectors in B as RDD[Vector].Matrix-vector productThe matrix-vector product captures most of the computations in Section 2.2.3, de-pending on the output data type, we need to take care of three different cases. Notethat for simplicity, we use DVector and DMatrix as type aliases for RDD[DenseVector]and RDD[Vector], respectively. Moreover, for all the operations we require thenumber of partitions in DMatrix and DVector be equal. The details are as fol-lows:• DMatrix × Vector = DVectorThis is the case where a distributed matrix is multiplied to a local vector andthe result is a distributed vector. We can take advantage of the local vectorand broadcast it to all the executors where each contains part of DMatrix.Therefore, all the inner product computations happen locally within executorsand no further communication is required.For more details, see LinopMatrix.scala in [20].34Figure 4.2: [7, spark-1-1-mllib-performance-improvements]• DMatrix × DVector = VectorThis is the case where the adjoint of a distributed matrix is multiplied to a dis-tributed vector (all must have the equal number of partitions) and outputs a lo-cal vector. The implementation is done via zipPartitions which alignscorresponding partitions of DMatrix and DVector together. Then, by aniterator partial multiplications are computed and the final sum is done viatreeAggregate. For more details, see LinopMatrixAdjoint.scalain [20].• DVector × DMatrix = DMatrixThis very special case happens when a diagonal matrix D in Section 2.2.3which is represented as a DVector containing its diagonal elements, is mul-tiplied to a distributed matrix B and the result DB is a distributed matrix thatwill be later used in Gramian matrix computation (DB)T DB as described inSection 4.1.1. Given that D and B have equal number of partitions, after usingzipPartitions to align corresponding partitions in one place, then ele-ments of each partition of D is aligned with elements of each partition of B viacheckedZip and an element of the result is a Double precision numbermatched by a Vector, therefore, the scalar-vector multiplication is done by35BLAS scal routine. Similar to other distributed computations, iterators playa significant role here. For more details, see SpLinopMatrix.scala in[20].4.1.2 Unified interfaceWith described different implementations for local and distributed vectors and ma-trices, having a unified interface is very convenient for further development. Thisunification can be obtained by the use of Scala implicits [19]. That is, throughimplicits we can manage different implementations of local and distributed casesuniformly. Our choice directly uses Spark-TFOCS design [21] by separating localand distributed implementations into different name spaces and adding packageobject modifier so that they can be visible to all members of our solver [20]package. For more details, see [20].4.2 Benchmark resultsIn this section, we present our benchmark results both for small problems that fit ina single machine in Section 4.2.1, and the large-scale results obtained from clusterof machines in Section 4.2.2.4.2.1 Local modeSpark is designed for large-scale applications and its main advantage is seen whenthe application utilizes the complete parallelization power of a cluster of machines.Therefore, our main purpose is to test our linear programming solver [20] for large-scale problems, however, we have decided to include the local results on smallproblems essentially for two reasons:• Local tests are important for detecting bugs in our program prior to launchingit over a cluster.• Local tests help to some extent in finding computational bottlenecks for ourmain application.36Historically, linear programming problems are presented in Mathematical Pro-gramming System (MPS) file formats. Netlib [18] provides linear programmingproblems at various levels of difficulties in compressed MPS formats. With thehelp of [12] and [21] we parsed some of Netlib small problems in MPS formatand transformed them into the standard linear programming format Equation 2.1suitable for our solver.We have compared the results to the following solvers:• GLPK [10] which is an open-source linear programming solver written in Clanguage and provides the baseline results.• Spark-TFOCS package [21] which is an optimization package imitating Mat-lab TFOCS (Templates for First-Order Conic Solvers) library on top of Sparkand has (`2 regularized) smoothed standard linear programming solver sup-port.Our local hardware specifications are: Intel-Core i7-4810MQ CPU-2.80GHz× 8 and 8GB RAM. As for the software specifications, we have used Scala 2.10.4,Java 1.8, Ubuntu 14.04 operating system, OpenBLAS enabled with the followingGarbage Collection (GC) flags• -XX:MaxMetaspaceSize=3G• -XX:+UseCompressedOops to compress the object pointers to 4-bytes• -XX:ConcGCThreads=8• -XX:+UseConcMarkSweepGCTable 4.1 contains the name of the Netlib problems, number of non-zeros (nn)and execution time measured in seconds. m,n are dimensions of problems and p isthe number of partitions. Also, netlib sol denotes the solution provided by Netliband rel error is the relative error of the “official” solution and the solution found bythe solver.37solver problem name nn m n p nc netlib sol rel error time ratiospark-lpAFIRO 88 27 331 1-464.753142863.24e-9 × 4862 2 3.24e-9 × 490spark-tfocs1 1 1.35e-8 × 173932 2 4.61e-8 × 18813GLPK 1 1 0 × 1spark-lpAGG2 4515 517 3021 1-20239252.3562.21e-11 × 1112 2 2.67e-11 × 231GLPK 1 1 0 × 1spark-lpSCFXM2 5229 661 9141 136660.2615654.20e-12 × 682 2 4.52e-12 × 154GLPK 1 1 0 × 1spark-lpBNL2 16124 2325 34891 11811.23654043.49e-10 × 1502 2 3.47e-10 × 156GLPK 1 1 0 × 1spark-lpMAROS-R7 151120 3137 94081 11497185.16652.06e-11 × 392 2 2.06e-11 × 40GLPK 1 1 0 × 1Table 4.1: Local resultFrom Table 4.1, it is evident that Spark-TFOCS (smoothed) linear programmingsolver is very slow, even for small problems and clearly is not an option for large-scale linear programming problems. We did not include the Spark-TFOCS resultsfor slightly bigger problems because of the very large execution time exceeding the500 seconds threshold.As expected, our Spark-LP solver [20] is much slower when solving small prob-lems than GLPK. The reasons are not only the clear speed differences betweenScala/Java and C languages, but also the inevitable overheads that Spark has fordistributed computations versus sequential ones. One clear observation from thelocal results is that as problems are becoming bigger the execution time ratio be-tween Spark-LP and GLPK is decreasing. That is mainly happening because thecomputations themselves are taking longer thus the overheads slowdown effects arebecoming smaller and for large-scale problems (if tuned properly) they will becomenegligible.The following are Spark’s inevitable computational overheads for distributedlarge-scale computations that have direct effects on our Spark-LP solver beingslower than GLPK on small problems.38• Serialization of lambdas to be distributed via the network to the executors• Deserialization of lambdas inside executors for running tasks• Scheduler delay• Garbage Collection (GC)• Network bandwidthSpark offers many configuration parameters for tuning an application, such asthe choice of fast serialization with Kryo [13] and specifying GC flags. One of thebest ways for choosing a suitable set of parameters is via Spark-UI. It is a commonpractice to use Spark-UI prior to launching an application to a cluster that we willexplain in Section 4.2.1. In fact, the above local results Table 4.1 were obtained bythe following additional configurations• "spark.executor.extraJavaOptions"was set to -XX:NewRatio=6.• "spark.locality.wait" was set to 0. That means, all tasks must runlocally and there is no wait for other less local options.For the complete set of configuration settings, please see [2].Spark-UISpark-UI is a tool which visualizes some of the most important information ofruntime. For example, it shows the specifications of machines that are runningtasks, the number of stages that were completed and are still running. We canfocus on a particular stage or task and see the underlying DAG of operations, theexecution time, serialization and de-serialization time, scheduler delay and the GCtime. As an example, upon solving BNL2.mps as noted before in Table 4.1, wecan see39Figure 4.3: Spark-UI example 1and more specific information about a completed taskFigure 4.4: Spark-UI example 240As seen, when the number of tasks is much bigger than the number of availablecores, the scheduler delay becomes one of the major bottlenecks. Therefore, forlarge-scale applications we have to make sure that the ratio between the number oftasks and the available cores is small enough.4.2.2 Cluster modeHere we will present the results for large-scale problems. The main difficulty inmeasuring the execution time of our solver for large-scale linear programming prob-lems is finding the problems that meet the specification for the number of equationsthat our solver can handle.As explained in Section 4.1.1, due to the fact that the core Java only supportsarrays of 32-bits long, the maximum number of columns in B is limited to 65535.Therefore, we needed to generate our own large-scale linear programming problemsfrom Gaussian distribution N (µ,σ) with mean µ and standard deviation σ . Theproblem creation is controlled by the number of rows n, columns m and sparsitydensity δ ∈ (0,1] parameters.For the cluster resources, we have used [7, Databricks cloud] which uses Ama-zon’s Elastic Computing instances with pre-installed and configured Apache Spark-1.6.1 machines where each machine is memory optimized (r3.xlarge) with 4cores, 30 GB RAM and moderate network bandwidth. Note that it is technicallypossible to use workers having more cores, RAM and much higher bandwidths butat the higher costs. However, at the time of writing this thesis, the r3.xlargeinstances offer the least cost. Also, the number of executors in our clusters variesbetween 16,32 and 64.To obtain the results, we have tested different values for n,m,δ and varied thenumber of partitions p to obtain the complete parallelization of the tasks.The common practice for determining the number of partitions to obtain thecomplete parallelization is 2-3 times the total number of available cores in clus-ter. However, we found out that for our application where distributed matrix bydistributed matrix multiplication has the largest overhead among other parts, evenwhen re-used distributed matrices and vector are cached in memory, if the numberof partitions is high then communications become a huge bottleneck.41We have measured the total execution times as well as other parameters for thebest possible number of partitions by trial and error. Table 4.2 shows the resultsn? m? δ p cores creation (sec) init (sec) avg iter (sec) total (sec)5 3 0.01 8 64 13.0 6.2 2.0 56.35 3 0.01 16 64 29.5 2.1 2.1 50.75 3 0.1 8 64 10.8 7.0 5.1 136.05 3 0.1 16 64 21.1 3.4 1.8 44.66 3 0.01 32 128 37.1 4.0 4.0 205.06 3 0.1 32 128 38.9 6.2 5.9 262.47 3 0.01 64 128 43.2 8.1 19.8 1871.57 3 0.1 64 128 33.0 15.8 29.1 2465.68 3 0.01 1024 256 101.1 60.3 200.9 exceeds5 4 0.01 128 128 37.6 260.2 260.0 7021.75 4 0.1 128 128 37.9 314.7 326.0 8465.36 4 0.01 256 256 77.4 312.3 312.1 9677.26 4 0.1 256 256 76.8 398.9 422.1 13484.5Table 4.2: Distributed results: n?,m? are logarithm values of n,m respectively,δ is the sparsity density, p is the number of partitionsWe required that all convergence measurements, mentioned in Section 2.2.3,must fall under the tolerance of 10−8 and at most the maximum of 100 iterationslimit. In our measurements, the case with n = 108,m = 103 exceeds the 100 itera-tions.For simplicity, we denote each record in Table 4.2 by n?-m?-δ -p no. machines.As an example, 5-3-0.1-16 64m denotes the generated linear programming problemwith n = 105,m = 103, sparsity density δ = 0.1, number of partitions p = 16 and64 machines where one is a master and other 63 are workers.Figure 4.5 illustrates the performance comparisons of the best found results42Figure 4.5: Distributed performance resultsFigure 4.5 shows that increasing m affects the performance more than increasingn. This is due to two reasons:1. Increasing m makes row vectors longer. Therefore, that restricts each workerto store more number of row vectors. Hence, local computations take longer.2. The number of partitions and cores (machines) must increase (in most casesby a factor of 4) thus, the communications take much longer.The same argument justifies the increase in average time per iteration when mbecomes larger, shown in Figure 4.643Figure 4.6: Average iteration timeWe have also observed that as n becomes larger the convergence rate drops forlarge-scale problems as opposed to small-scale problems as shown in Figure 4.7.For further tuning, when visualizing the stages with Spark-UI, we used local-checkpoints that sacrifices some fault-tolerance features for increasing the speed. Infact, it shortens the lineage after every iteration and makes the application DAG alot less complicated. For example, it took 3 seconds to compute Gramian matrixof size 300 GB stored in memory of 32 machines for the case with n = 107,m =103,δ = 0.1, p = 64.In next chapter, we will describe a number of important suggestions for furtherdevelopments.44Figure 4.7: Distributed convergent results45Chapter 5ConclusionsThe goal in this thesis project was to investigate Apache Spark’s capabilities andpotential for scientific computing and high performance computing tasks and as-sess its advantages and disadvantages. For this, we have developed a large-scaledistributed linear programming solver, Spark-LP [20], on top of Apache Spark op-timization module. Spark-LP supports efficient sparse and dense linear algebracomputations, it is open-source, fault-tolerant and can be used on commodity clus-ters of machines.The software architecture design was explained in depth and results were pro-vided in Section 4.2.However, we have faced a number of structural and performance limitations thatwe try to address in the following lists of recommendations:• Our solver [20] only accepts linear programming problems in the standardformat Equation 2.1. One possible extension is to add a preprocessing unitso that more general formats can be supported by transforming them into thestandard format. However, that will add more overheads to the computationswhich we tried to avoid as much as possible until now that we have promisingresults for the standard format case.• Moreover, having a feasibility detection unit is necessary for making it a ma-ture large-scale solver.46• Furthermore, because of the somewhat similar nature of quadratic program-ming it is also feasible to generalize our solver for such problems.• The results of convergent rate shows that Mehrotra’s predictor-corrector al-gorithm, described in Section 2.2.3, for large-scale problems is slower thanwhen it is used for small-scale problems. Therefore, finding an optimal algo-rithm for large-scale problems with our relatively strict settings is an impor-tant challenge for the future work.• To resolve the limitations on the number of columns , described in Sec-tion 4.1.1, it is possible to extend core Java 32-bits arrays to 64-bits with[9, fastUtil] library. But that will result in more inevitable changes in theunderlying Spark optimization module.• Last but not least, we have considered the use of Graphical Processing Units(GPU) to have expected faster local linear algebra computations in the cluster.In fact, Spark can work with [6, CUDA]-enabled GPUs. More specifically,one can use [1, Apache Mesos] as the resource manager to emulate NVIDIA[8, docker] on the Spark cluster.Currently, the best data structure designed for similar use cases is providedin [16, ND4J] library. It is called INDArray and it interplays nicely withApache Spark, when having a heterogeneous cluster of multiple CPUs andGPUs. Therefore, one could use RDD[INDArray] as the underlying dis-tributed structure for such clusters. Note also that INDArrays are 64-bits inlength, then it automatically solves the limitations on the number of columnsdescribed in Section 4.1.1.ND4J uses vectorized C++ code through [11, JavaCpp] library for all numer-ical operations. It handles the stored data off-heap (with its own GC imple-mentation) and it utilizes the optimized native BLAS libraries. The JavaCpplibrary plays a crucial here as it automatically generates the required JNI withzero overhead to access all native codes. However, ND4J is not yet matureenough for high performance scientific computing use cases and further de-velopments are required to add more BLAS and LAPACK supports.47Bibliography[1] Apache Mesos: A distributed systems kernel. http://mesos.apache.org/.December 2016.[2] Apache Spark: Lightening-fast cluster computing. http://spark.apache.org/.December 2016.[3] Apache Spark Source Code. http://github.com/apache/spark.[4] BLAS: Basic Linear Algebra Subprograms. http://www.netlib.org/blas/.November 2016.[5] Breeze: Numerical Processing Library for Scala.http://github.com/scalanlp/breeze.[6] CUDA: Parallel Computing Platform and Programming Model for GPU.https://developer.nvidia.com/cuda-zone. December 2016.[7] Databricks. https://databricks.com. December 2016.[8] Docker: Software Containerization Platform. https://www.docker.com/.December 2016.[9] FastUtil: Fast and Compact Type Specific Collections for Java.http://fastutil.di.unimi.it/. December 2016.[10] GLPK: GNU Linear Programming Kit. https://www.gnu.org/software/glpk/.December 2016.[11] JavaCpp: The missing bridge between Java and native C++.https://github.com/bytedeco/javacpp. December 2016.[12] JOptimizer. http://www.joptimizer.com/. November 2016.[13] Kryo serialization: Java serialization and cloning (fast, efficient, automatic).https://github.com/EsotericSoftware/kryo. December 2016.48[14] LAPACK: Linear Algebra PACKage. http://www.netlib.org/lapack/.November 2016.[15] MPI: A Message-Passing Interface Standard.http://mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf. December 2016.[16] ND4J: N-dimensional arrays and scientific computing for the JVM.http://nd4j.org/. December 2016.[17] Netlib-java: High Performance Linear Algebra (low level).http://github.com/fommil/netlib-java.[18] Netlib Linear Programming Data. http://www.netlib.org/lp/data. November2016.[19] Scala Programming Language. http://scala-lang.org.[20] Spark-LP. https://github.com/ehsanmok/spark-lp. December 2016.[21] Spark-TFOCS: A Spark port of Templates for First-Order Conic Solvers.http://github.com/databricks/spark-tfocs.[22] T. Terlaky C. Roos and J.-Ph. Vial. Theory and Algorithms for LinearOptimization, An Interior Point Approach. John Wiley and Sons, Chichester,UK, 1997.[23] J. Dean and S. Ghemawat. MapReduce: Simplified Data Processing on LargeClusters. Communications of ACM, 2008.[24] G. Farkas. A Fourier-fele mechanikai elv alkalmazasai (in Hungarian).pages 457–472, 1894.[25] Alexey Grishchenko. Distributed Systems Architecture.https://0x0fff.com/spark-memory-management/.[26] D. Anguita J. L. Reyes-Ortiz, L. Oneto. Big Data Analytics in the Cloud:Spark on Hadoop vs MPI/OpenMP on Beowulf. Procedia Computer Science,Volume 53, pages 121–130, 2015.[27] S. Wright J. Nocedal. Numerical optimization. Springer, second edition,2006.[28] T. Das A. Dave J. Ma M. McCauley M. J. Franklin S. Shenker I. StoicaM. Zaharia, M. Chowdhury. Resilient Distributed Datasets: A Fault-TolerantAbstraction for In-Memory Cluster Computing. 2010.49[29] A. Goel R. B. Zadeh. Dimension Independent Similarity Computation. TheJournal of Machine Learning Research, pages 1605–1626, 2013.[30] A. Staple B. Yavuz L. Pu S. Venkataraman E. Sparks A. Ulanov M. ZahariaR. B. Zadeh, X. Meng. Matrix Computations and Optimization in ApacheSpark. 2016.[31] M. Zaharia PhD Thesis. An Architecture for Fast and General DataProcessing on Large Clusters. Electrical Engineering and ComputerSciences University of California at Berkeley, 2014.[32] R.J. Vanderbei. Linear Programming: Foundations and Extensions. KluwerAcademic Publishers, 1997.[33] S. Wright. Primal-dual interior point methods. Society for industrial andapplied mathematics, 1987.50

Cite

Citation Scheme:

        

Citations by CSL (citeproc-js)

Usage Statistics

Share

Embed

Customize your widget with the following options, then copy and paste the code below into the HTML of your page to embed this item in your website.
                        
                            <div id="ubcOpenCollectionsWidgetDisplay">
                            <script id="ubcOpenCollectionsWidget"
                            src="{[{embed.src}]}"
                            data-item="{[{embed.item}]}"
                            data-collection="{[{embed.collection}]}"
                            data-metadata="{[{embed.showMetadata}]}"
                            data-width="{[{embed.width}]}"
                            async >
                            </script>
                            </div>
                        
                    
IIIF logo Our image viewer uses the IIIF 2.0 standard. To load this item in other compatible viewers, use this url:
http://iiif.library.ubc.ca/presentation/dsp.24.1-0340337/manifest

Comment

Related Items