Open Collections

UBC Theses and Dissertations

UBC Theses Logo

UBC Theses and Dissertations

Transport-level transactions : simple consistency for complex scalable low-latency cloud applications Tayarani Najaran, Mahdi 2015

Your browser doesn't seem to have a PDF viewer, please download the PDF to view this item.

Item Metadata


24-ubc_2015_september_tayaraninajaran_mahdi.pdf [ 1.32MB ]
JSON: 24-1.0166572.json
JSON-LD: 24-1.0166572-ld.json
RDF/XML (Pretty): 24-1.0166572-rdf.xml
RDF/JSON: 24-1.0166572-rdf.json
Turtle: 24-1.0166572-turtle.txt
N-Triples: 24-1.0166572-rdf-ntriples.txt
Original Record: 24-1.0166572-source.json
Full Text

Full Text

Transport-Level Transactions: Simple Consistency forComplex Scalable Low-Latency Cloud ApplicationsbyMahdi Tayarani NajaranM. Sc., Computer Science, The University of British Columbia, 2009M. Sc., Electrical Engineering, Sharif University of Technology, 2007B. Sc., Electrical Engineering, Ferdowsi University of Mashhad, 2005A THESIS SUBMITTED IN PARTIAL FULFILLMENTOF THE REQUIREMENTS FOR THE DEGREE OFDoctor of PhilosophyinTHE FACULTY OF GRADUATE AND POSTDOCTORALSTUDIES(Computer Science)The University of British Columbia(Vancouver)August 2015c© Mahdi Tayarani Najaran, 2015AbstractThe classical move from single-server applications to scalable cloud services isto split the application state along certain dimensions into smaller partitions in-dependently absorbable by a separate server in terms of size and load. Maintain-ing data consistency in the face of operations that cross partition boundaries im-poses unwanted complexity on the application. While for most applications manyideal partitioning schemes readily exist, First-Person Shooter (FPS) games and Re-lational Database Management Systems (RDBMS) are instances of applicationswhose state can’t be trivially partitioned. For any partitioning scheme there existsan FPS/RDBMS workload that results in frequent cross-partition operations.In this thesis we propose that it is possible and effective to provide unpartition-able applications with a generic communication infrastructure that enforces strongconsistency of the application’s data to simplify cross-partition communications.Using this framework the application can use a sub-optimal partitioning mecha-nism without having to worry about crossing boundaries. We apply our thesis totake a head-on approach at scaling our target applications. We build three scalablesystems with competitive performances, used for storing data in a key/value data-store, scaling fast-paced FPS games to epic sized battles consisting of hundredsof players, and a scalable full-SQL compliant database used for storing tens ofmillions of items.iiPrefaceThis dissertation is based on the following publications:• P1. Mahdi Tayarani Najaran, Shun-Yun Hu, Norman C. Hutchinson, Spex:Scalable Spatial Publish/Subscribe for Distributed Virtual Worlds WithoutBorders, 5th ACM Multimedia Systems Conference (MMSys), 2014 [103].• P2. Mahdi Tayarani Najaran. Norman C. Hutchinson, Innesto: A Multi-Attribute Searchable Consistent Key/Value Store, International Journal ofBig Data Intelliegnce (IJBDI), 2014 [116].• P3. Mahdi Tayarani Najaran. Norman C. Hutchinson, Innesto: A Search-able Key/Value Store for Highly Dimensional Data, 5th IEEE Conferenceon Cloud Computing Technology and Science (CloudCom), 2013 [117].• P4. Mahdi Tayarani Najaran, Primal Wijesekera, Norman C. Hutchinson,Andrew Warfield, Distributed Locking and Indexing: In Search for ScalableConsistency, 5th Workshop on Large-Scale Distributed Systems and Mid-dleware (Ladis), 2011 [119].• P5. Mahdi Tayarani Najaran, Charles Krasic, Scaling Online Games withAdaptive Interest Management in the Cloud, 9th Workshop on Network andSystems Supports for Games (NetGames), 2010 [104].Most of the work in the papers is mine as the lead investigator with my advisor,Norman C. Hutchinson, as a co-author. Shun-Yun Hu was a co-author on P1 whichhelped on writing and the preparation of the paper, and also helped define eval-uation requirements and related work. P4 was done in collaboration with PrimaliiiWijesekera and his advisor Andrew Warfield. Primal helped with the implementa-tion of the MySQL handler, distributed locking and evaluation. P5 was done withmy previous advisor, Charles Krasic, as a co-author.Some of the writing of the thesis was taken from the aforementioned papers.Specifically, Chapter 3 draws from P1, P2 & P3, Chapter 4 from P2 & P3, Chapter5 from P1 and Chapter 6 from P4. Additionally, Chapter 6 uses text and evaluationresults from the Primal’s thesis [125], from whom permission has been obtained.ivTable of ContentsAbstract . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . iiPreface . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . iiiTable of Contents . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . vList of Tables . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xList of Figures . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiAcknowledgements . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiv1 Introduction . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 11.1 Online Games at Scale . . . . . . . . . . . . . . . . . . . . . . . 11.2 Classical Cloud-ification . . . . . . . . . . . . . . . . . . . . . . 21.3 Partitioning the Unpartitionable . . . . . . . . . . . . . . . . . . 31.4 The Thesis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31.5 CAP Revisited . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51.6 Contributions . . . . . . . . . . . . . . . . . . . . . . . . . . . . 61.6.1 One-Round Transactions . . . . . . . . . . . . . . . . . . 61.6.2 Key/Value Store . . . . . . . . . . . . . . . . . . . . . . 61.6.3 Scalable Spatial Publish/Subscribe . . . . . . . . . . . . . 71.6.4 Partitioned SQL . . . . . . . . . . . . . . . . . . . . . . 71.7 Roadmap . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8v2 Background & Motivation . . . . . . . . . . . . . . . . . . . . . . . 92.1 Consistency . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 92.1.1 Why Strong Consistency? . . . . . . . . . . . . . . . . . 112.1.2 ACID . . . . . . . . . . . . . . . . . . . . . . . . . . . . 122.2 Transport-level ACID . . . . . . . . . . . . . . . . . . . . . . . . 132.2.1 One-Round Transactions . . . . . . . . . . . . . . . . . . 132.2.2 Cloud Unfriendly Applications: A Different Approach . . 143 One-Round Transactions . . . . . . . . . . . . . . . . . . . . . . . . 183.1 API . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 193.1.1 Client . . . . . . . . . . . . . . . . . . . . . . . . . . . . 193.1.2 Server . . . . . . . . . . . . . . . . . . . . . . . . . . . . 203.1.3 Sample Transaction . . . . . . . . . . . . . . . . . . . . . 203.2 Commit Protocol . . . . . . . . . . . . . . . . . . . . . . . . . . 233.2.1 Lock Mode . . . . . . . . . . . . . . . . . . . . . . . . . 233.2.2 Clock Mode . . . . . . . . . . . . . . . . . . . . . . . . . 253.2.3 Lock vs. Clock . . . . . . . . . . . . . . . . . . . . . . . 273.3 Durability & Failure Recovery . . . . . . . . . . . . . . . . . . . 274 Innesto . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 294.1 Introduction . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 294.2 Overview . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 324.2.1 Data Model & API . . . . . . . . . . . . . . . . . . . . . 324.2.2 Architecture . . . . . . . . . . . . . . . . . . . . . . . . . 334.2.3 Strong Consistency . . . . . . . . . . . . . . . . . . . . . 344.3 Multi-Version Storage (MVS) . . . . . . . . . . . . . . . . . . . 344.3.1 Partitioning . . . . . . . . . . . . . . . . . . . . . . . . . 344.3.2 Non Structure-Modifying Operations . . . . . . . . . . . 354.3.3 Structure-Modifying Operations . . . . . . . . . . . . . . 364.3.4 Partition Hierarchy . . . . . . . . . . . . . . . . . . . . . 374.4 Search . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 384.4.1 Search Clones . . . . . . . . . . . . . . . . . . . . . . . . 384.4.2 Sparse Clones . . . . . . . . . . . . . . . . . . . . . . . . 40vi4.5 Extended API . . . . . . . . . . . . . . . . . . . . . . . . . . . . 414.5.1 Operation Transactions . . . . . . . . . . . . . . . . . . . 414.5.2 Conditional Operations . . . . . . . . . . . . . . . . . . . 414.5.3 Extensions . . . . . . . . . . . . . . . . . . . . . . . . . 424.6 Evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 434.6.1 Key/Value Performance . . . . . . . . . . . . . . . . . . 444.6.2 Load Balancing . . . . . . . . . . . . . . . . . . . . . . . 454.6.3 Clone Cost . . . . . . . . . . . . . . . . . . . . . . . . . 464.6.4 High Dimensionality . . . . . . . . . . . . . . . . . . . . 474.6.5 Lock vs. Clock . . . . . . . . . . . . . . . . . . . . . . . 484.6.6 Summary . . . . . . . . . . . . . . . . . . . . . . . . . . 514.7 Related Work . . . . . . . . . . . . . . . . . . . . . . . . . . . . 514.7.1 Cloud Storage & Key/Value Datastores . . . . . . . . . . 514.7.2 Transactional Datastores . . . . . . . . . . . . . . . . . . 534.7.3 Peer-to-Peer (P2P) . . . . . . . . . . . . . . . . . . . . . 534.7.4 Distributed Transactions . . . . . . . . . . . . . . . . . . 544.8 Conclusion . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 555 SPEX . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 565.1 Introduction . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 565.2 Virtual Environments . . . . . . . . . . . . . . . . . . . . . . . . 585.3 Overview . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 595.3.1 Dynamic VE Distribution . . . . . . . . . . . . . . . . . 615.3.2 Consistency & Fault Tolerance . . . . . . . . . . . . . . . 635.3.3 Adaptive Load Balancing . . . . . . . . . . . . . . . . . . 655.4 SPS Functionality . . . . . . . . . . . . . . . . . . . . . . . . . . 675.4.1 Spatial Publications . . . . . . . . . . . . . . . . . . . . . 675.4.2 Spatial Subscriptions . . . . . . . . . . . . . . . . . . . . 685.5 Implementation Details . . . . . . . . . . . . . . . . . . . . . . . 685.5.1 Spatial Partitioning . . . . . . . . . . . . . . . . . . . . . 685.5.2 Scalable Pub/Sub . . . . . . . . . . . . . . . . . . . . . . 695.6 Discussion . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 695.7 Evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 71vii5.7.1 Processing Latency . . . . . . . . . . . . . . . . . . . . . 725.7.2 Bandwidth . . . . . . . . . . . . . . . . . . . . . . . . . 745.7.3 Accuracy . . . . . . . . . . . . . . . . . . . . . . . . . . 755.7.4 Cross-Continent Latency . . . . . . . . . . . . . . . . . . 785.7.5 Stability . . . . . . . . . . . . . . . . . . . . . . . . . . . 805.7.6 Synchronization Costs . . . . . . . . . . . . . . . . . . . 815.8 Related Work . . . . . . . . . . . . . . . . . . . . . . . . . . . . 855.8.1 Commercial Deployments . . . . . . . . . . . . . . . . . 855.8.2 Query/Broadcast-based . . . . . . . . . . . . . . . . . . . 855.8.3 Partitioning . . . . . . . . . . . . . . . . . . . . . . . . . 865.8.4 SPS . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 865.9 Conclusions and Future Work . . . . . . . . . . . . . . . . . . . . 876 iEngine . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 896.1 Introduction . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 896.2 Current RDBMS Architectures . . . . . . . . . . . . . . . . . . . 926.2.1 Query Processing . . . . . . . . . . . . . . . . . . . . . . 936.2.2 Storing & Indexing . . . . . . . . . . . . . . . . . . . . . 936.2.3 Isolation . . . . . . . . . . . . . . . . . . . . . . . . . . 946.2.4 Distribution Challenges . . . . . . . . . . . . . . . . . . 956.3 iEngine . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 956.3.1 Query Processing . . . . . . . . . . . . . . . . . . . . . . 976.3.2 Storage Handler . . . . . . . . . . . . . . . . . . . . . . 986.3.3 Transaction Handler . . . . . . . . . . . . . . . . . . . . 996.3.4 Correctness . . . . . . . . . . . . . . . . . . . . . . . . . 1006.4 Range Locking . . . . . . . . . . . . . . . . . . . . . . . . . . . 1016.4.1 Locks & Deadlocks . . . . . . . . . . . . . . . . . . . . . 1026.4.2 Fairness . . . . . . . . . . . . . . . . . . . . . . . . . . . 1036.4.3 Timeout Tuning . . . . . . . . . . . . . . . . . . . . . . . 1046.5 Anatomy of a Transaction . . . . . . . . . . . . . . . . . . . . . . 1056.6 Fault Tolerance . . . . . . . . . . . . . . . . . . . . . . . . . . . 1066.6.1 Server Failure . . . . . . . . . . . . . . . . . . . . . . . . 1066.6.2 Engine Failure . . . . . . . . . . . . . . . . . . . . . . . 106viii6.6.3 Transaction Handler Failure . . . . . . . . . . . . . . . . 1066.7 Evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1076.7.1 TPC-C . . . . . . . . . . . . . . . . . . . . . . . . . . . 1086.7.2 Strong Scaling . . . . . . . . . . . . . . . . . . . . . . . 1096.7.3 Contention . . . . . . . . . . . . . . . . . . . . . . . . . 1106.7.4 Web Compatible Workload . . . . . . . . . . . . . . . . . 1126.7.5 Crash Consistency Cost . . . . . . . . . . . . . . . . . . 1136.8 Related Work . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1146.8.1 Data Partitioning . . . . . . . . . . . . . . . . . . . . . . 1146.8.2 Distributed Indexing . . . . . . . . . . . . . . . . . . . . 1156.8.3 Distributed Locking . . . . . . . . . . . . . . . . . . . . 1166.8.4 NoSQL . . . . . . . . . . . . . . . . . . . . . . . . . . . 1166.8.5 Scaling Storage . . . . . . . . . . . . . . . . . . . . . . . 1176.8.6 NewSQL . . . . . . . . . . . . . . . . . . . . . . . . . . 1186.8.7 Scalable Transactional Models . . . . . . . . . . . . . . . 1186.9 Conclusion & Future Work . . . . . . . . . . . . . . . . . . . . . 1197 Future Work: Argal . . . . . . . . . . . . . . . . . . . . . . . . . . . 1208 Conclusion . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 124Bibliography . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 126ixList of TablesTable 2.1 Possible anomalies of inconsistent systems [110]. . . . . . . . 10Table 2.2 Order of operations a, b and c observable by every thread underdifferent consistency models (assuming b causally depends on a). 10Table 3.1 Description of message fields used in the commit protocol. . . 25Table 4.1 Innesto’s API which includes basic key/value operations plussearch. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 32Table 4.2 Innesto’s latency and storage requirements for storing N itemswith different configurations. . . . . . . . . . . . . . . . . . . 41Table 4.3 YCSB workload specification in terms of operation mixturesand access distribution. By default, each item has ten attributeswith a total size of 1KB. . . . . . . . . . . . . . . . . . . . . . 43xList of FiguresFigure 3.1 Transaction commit protocol. . . . . . . . . . . . . . . . . . 24Figure 4.1 Logical architecture of Innesto components. Each componentis fully distributed. . . . . . . . . . . . . . . . . . . . . . . . 33Figure 4.2 Spatial partitioning and distribution of a table. The root parti-tion covers the entire key space and each partition keeps ref-erences to child/parent partitions. Only leaf (gray) partitionsstore the actual data. Partitions are distributed between serversfor load balancing. . . . . . . . . . . . . . . . . . . . . . . . 35Figure 4.3 Search Clones are used to partition data based on secondaryattributes. Items are grouped into attribute groups to bettersupport range search. Each Ki is a full copy of an item. . . . . 38Figure 4.4 Average throughput for YCSB benchmark. The first three (red)workloads are write heavy, the middle three (gray) are readheavy, and the right most workload (black) is search heavy. . . 44Figure 4.5 Temporal throughput in Load phase of the YCSB benchmark.Notice how HyperDex’s performance decreases with datasetsize while Cassandra and Innesto maintain a steady throughput. 46Figure 4.6 Average Innesto throughput for YCSB benchmark with differ-ent configurations. . . . . . . . . . . . . . . . . . . . . . . . 47Figure 4.7 Average throughput for write-only workload of inserting 5 mil-lion items with varying number of data dimensionality. . . . . 48xiFigure 4.8 Average throughput for YCSB benchmark for Innesto in lockmode and clock mode. Compared to lock mode, clock modehas on average 31.8% lower throughput. . . . . . . . . . . . . 49Figure 4.9 Average throughput of Innesto in lock mode and clock modeusing the hotspot distribution. The x-axis shows the probabilityof accessing a hot item. . . . . . . . . . . . . . . . . . . . . . 50Figure 4.10 Average throughput of Innesto in lock mode and clock modeusing the hotspot distribution with a fixed access probability of95% with varying number of hot items. . . . . . . . . . . . . 51Figure 5.1 Basic architecture of SPEX. All communications are done us-ing one-round transactions. . . . . . . . . . . . . . . . . . . 59Figure 5.2 (left) Full partitioning vs. (right) Progressive partitioning. . . 62Figure 5.3 Distributed dynamic partition hierarchy. . . . . . . . . . . . 63Figure 5.4 99th percentile SPEX processing delays in the testbed. . . . . 73Figure 5.5 99th percentile SPEX processing delay in the testbed with dif-ferent update rates at map size 1024×1024. . . . . . . . . . 75Figure 5.6 SPEX host and actor average bandwidth in the testbed. . . . . 76Figure 5.7 SPEX accuracy in the testbed. . . . . . . . . . . . . . . . . . 78Figure 5.8 End-to-end delay across the continent in EC2. . . . . . . . . 79Figure 5.9 SPEX performance over period of 1 hour of run time with ran-dom waypoints. . . . . . . . . . . . . . . . . . . . . . . . . 82Figure 5.10 SPEX performance over 1 hour of run time with VE size of256x256 and different numbers of hotspots. . . . . . . . . . . 83Figure 5.11 Cost of using one-round transactions for global synchroniza-tion instead of simple message passing. . . . . . . . . . . . . 84Figure 6.1 Conventional workflow in a RDBMS. . . . . . . . . . . . . . 92Figure 6.2 General architecture of iEngine. . . . . . . . . . . . . . . . . 96Figure 6.3 Distributed storage layer architecture. . . . . . . . . . . . . . 97Figure 6.4 MySQL architecture. . . . . . . . . . . . . . . . . . . . . . . 98xiiFigure 6.5 Range lock example. The numbers represent the referencecount of each fragment. Note that the partitions and lockscould have any number of dimensions. . . . . . . . . . . . . . 102Figure 6.6 Red are existing range locks and gray are pending range lockrequests (left) Order in which range lock requests arrive (right)Equivalent lock dependency graph. . . . . . . . . . . . . . . . 104Figure 6.7 Strong scaling of iEngine. . . . . . . . . . . . . . . . . . . . 109Figure 6.8 User contention. . . . . . . . . . . . . . . . . . . . . . . . . 111Figure 6.9 Web compatible workload. . . . . . . . . . . . . . . . . . . . 112Figure 6.10 Crash consistency. . . . . . . . . . . . . . . . . . . . . . . . 113Figure 7.1 Architecture of Argal. . . . . . . . . . . . . . . . . . . . . . 122xiiiAcknowledgementsI would like to thank my advisor Dr. Norman C. Hutchinson for his support andguidance throughout the course of developing this thesis. His feedback and carefuleye for detail proved very helpful.Special thanks to my wife Sara for her support and motivation, without whichI might have never reached the end.Thanks to all collaborators, project members and NSS friends.xivChapter 1Introduction1.1 Online Games at ScaleOnline games are a billion dollar industry which provide experiences ranging fromsimple pass-time puzzles to million-dollar professional tournaments [16]. Depend-ing on the genre of game, scaling online games has different interpretations. Playervs. Environment (PvE) games focus on the interactions between the player and thegame in the form of puzzles, tasks or quests. Players can only interact with eachother asynchronously at the meta-game level.1 Player vs. Player (PvP) games onthe other hand allow synchronous interactions between multiple players.2 Eachgame instance is a head-to-head challenge between a fixed number of players. Inboth types of PvE and PvP the service host acts as a communications medium be-tween the players, match making, and storing state. Scaling such games entailssupporting more instances of the game to achieve higher online user counts.First-person shooter (FPS) games are a popular form of PvP where each playerobserves a virtual world from the eyes of an avatar while interacting with otherplayers in real-time. FPS games have the potential to fulfill the excitement andthrill of participating in epic battles, ones that we can currently only observe in1Farmville [18] is a game where players individually build their own farm and can send/receivegifts and favors from their friends on social networks.2Clash of Clans [12] is a fort-building game where players can attack another player’s fort or haveto defend themselves from attacks. Dota2 [15] is a team-based game where each fixed-size team ofplayers has to destroy the other team’s stronghold.1movies such as Avatar, Lord of the Rings and Star Wars. Scaling FPS games toallow more players to participate in the same game instance has become an activearea of research.1.2 Classical Cloud-ificationIn the era of cloud computing, the cloud has emerged as the dominant solution forlarge-scale on-line services and big data applications. With scalability in mind, thecloud has presented an appealing hosting environment for pay-as-you-go, always-available on-line services. The first version of our work found commercial virtual-ized cloud environments (Amazon EC2) [3] to be suitable for hosting FPS gamesat such scale with a very appealing model of gaming-as-a-service [104].The classical move from single-server applications to the cloud is to split theapplication state along certain dimensions into smaller partitions absorbable byeach cloud server in terms of size and load. Each partition is independently man-aged by a single server. As long as each operation can be contained in a singlepartition, data consistency may be enforced by its owner and race conditions besafely resolved. Thus the key feature of the partitioning criteria are being onesthat result in the least cross-partition operations. Enforcing consistency on cross-partition operations requires following explicit protocols between the servers andadds unwanted complexity to the application.For most applications many of such criteria readily exist. Unfortunately, FPSgames are not like most applications. Players have a virtual position inside thegame but the range of effect they can cause is arbitrarily defined by the weaponthey carry. Thus for any partitioning criteria there exists a weapon that results incross-partition operations. Additionally, unlike video data which is encoded onceand served to users without modification, the state of the game is frequently andregularly modified by the players. Compared to social media applications, FPSgames have a much higher write/read ratio in their workload. Players need to ob-serve an almost consistent view of the game over wide area network (WAN) laten-cies which start to diverge as latency increases [67], losing the game appeal. Theinteractions between the players is fast and unpredictable and the system cannotknow a-priori what a player will do next. Since the game is essentially a graphical2representation of state, data inconsistencies translate into bizarre visual artifactsand can’t be silently swept under the carpet.1.3 Partitioning the UnpartitionableWithout an optimal partitioning scheme, an epic-scale FPS game is faced with oneof two options. First, partition the players between the servers and limit them totheir current partition. This will eliminate the need for cross-partition operations,but the limited feature set is visible to the players and reduces the quality of thevirtual experience. Players can only see a subset of other players and interactionsare only possible with objects and players in the current partition.The second option is to partition the state to provide a rich virtual experienceand deal with frequent cross-partition operations and constantly changing parti-tions. Related work in this area have successfully achieved scalability via the firstoption (e.g., [33]) despite its limitations. Currently, any other attempts to circum-vent interaction limitations by using the second option have resulted in relaxingdata consistency down to eventual-consistency. Considering how players invest asignificant amount of time and money in a game, we consider eventual not goodenough. Inconsistencies can directly be mapped to monetary losses. In this thesiswe take the second option but aim to provide strong consistency.1.4 The ThesisAn application node is a processing unit in charge of a partition of the applica-tion state which could be anything from a single virtualized core to a large muli-processor computing cluster. No matter what the application nodes are, there ex-ists a scale at which a single node is no longer sufficient and multiple nodes arerequired. It is up to the application to now deal with sharing state across partitionsin a consistent manner. The nodes need to be aware of the existance of others whenperforming their computations and need to somehow communicate their events.In distributed systems the transport layer serves as the lowest level in the ap-plication stack. Applications use different forms of communications between dif-ferent components in the form of remote procedure calls (RPC). An RPC may usestandard requests, e.g., HTTP, REST [79], SOAP [37], or proprietary protocols,3e.g., RTMP [32], Thrift [47], protocol buffers [121]. Regardless, these are designedfor point-to-point communications and provide no means to deal with one-to-manycommunications or synchronization. As a result, higher layers in the applicationstack are forced to deal with complicated data consistency scenarios when the ap-plication logic mandates one-to-many communications or there is concurrency onthe data.This thesis proposes that it is possible and effective to provide unpartitionableapplications with a generic communication infrastructure that enforces strong con-sistency of the application’s data to simplify cross-partition communications. Ourapproach to providing data consistency is to replace the normal transport with onethat explicitly deals with data consistency. Instead of a simple message passingAPI, the unit of communication is the one-round transaction. Our transactions areone-round since they provide the same abstraction as one round of message pass-ing from source to destination, and they are a form of transaction as they allowatomically grouping and delivering messages to multiple recipients, always keep-ing the data in a consistent state. All subsequent components of the applicationare then built in this framework while having the freedom to fallback to normalmessage passing when consistency is not required. Using this framework the ap-plication can use a sub-optimal partitioning mechanism without having to worryabout crossing boundaries. This removes an excessive amount of complexity fromthe end application which no longer has to deal with enforcing isolation, concur-rency, and fault tolerance and recovery.One-round transactions provide a simple programming model which trans-forms the non-trivial task of protocol design into a much simpler task of data struc-ture design [49]. In essence, each message to be passed to other nodes only needsto consider what parts of the application’s state it is touching and what parts of thestate were used to compute the message. This single guideline is generic as it doesnot require knowloedge of other components which may be carrying out conflict-ing operations. It eliminates the need for the programmer to enumerate all possiblepermutations of actions on a message type basis.Frameworks for consistent distributed objects date back to as long as the ISISsystem [58]. Yet a key factor to consider in the applicability of a framework isthe amount of effort required to migrate existing complex applications to it. Some4well-known single-node applications such as MySQL [28] cannot be trivially con-verted to adopt a different design architecture.Modifying an existing application to use the one-round transaction frameworkis a simple as changing the transport layer without touching the rest of the applica-tion. One-round transactions will merely operate as the original transport would’vedone with no change to network performance complexity. Using one-round trans-actions for isolation and consistency requires some extra help from the applicationto interpret the contents of each message and adds at most an extra network round-trip time to each message. Providing this information is a simpler task than havingto re-architect the application’s data model. The performance overhead is alsopredictably bound. Hence, we consider migrating to one-round transactions to besimpler than using other frameworks. Our framework has allowed us to take a di-rect approach at building systems which were already too complicated to try anddeal with consistency and scalability at the same time.1.5 CAP RevisitedA system simultaneously seeking scalability and consistency immediately bringsthe CAP theorem [84] to mind. At a first glance, the CAP theorem says distributedapplications must choose two out of: consistency (C), availability (A), and theability to tolerate network partitions (P). This rationale has been used to justifythe decision to rank consistency last, making relaxing data consistency down toeventual-consistency in favor of availability a generally acceptable approach.A deeper look at the CAP theorem shows this trade-off to be greatly misun-derstood [9]. Partitions in the network are unavoidable, but rarely happen. Also,availability is deemed to be binary, yet in reality there is a spread of availabilityfrom all to nothing. The key insight here is that the CAP theorem holds in themicro scale, but taking a system as a whole, there doesn’t have to be a completeloss of availability at macro scale in the face of partitions. Thus a system may beconsistent with some form of availability at all times, even during partitions [9].CAP doesn’t contradict our design goals and we deem this model of operation tobe a reasonable trade-off for our framework.51.6 Contributions1.6.1 One-Round TransactionsUsing one-round transactions to build consistent distributed systems was inspiredby Sinfonia [49]. However, Sinfonia provides limited functionality for the back-end servers (memnodes), forcing computation on the front-end. This makes com-putations such as read-modify-write to be very inefficient. We present three dif-ferent implementations of one-round transactions in Chapter 3, each suitable for adifferent purpose. Lock-based isolation was inspired by Sinfonia [49] but providesimprovements to the API for better performance, and was developed independentlyfrom Granola [72] from which we take clock-based synchronization. One-roundtransactions serve as the basis for the rest of our applications.1.6.2 Key/Value StoreMuch of the success of large-scale Internet services can be attributed to key/valuestores which serve as the backbone of cloud platforms. The same common struc-tures of web services which use stateless front-end servers that indirectly commu-nicate through a scalable key/value store may be applied to FPS games. However,using existing key/value stores in our target application demonstrates that they fallshort on one or more required features.Data retrieval patterns of FPS games include a large number of multi-dimensional range queries which look for objects in the virtual environment withincertain areas. Most key/value stores don’t provide such a functionality. This meansthe gaming system would have to implement its own overlay to maintain suchinformation which will be inefficient compared to a natively supported search ca-pability. The dominant data model supported by key/value stores, i.e., eventualconsistency, does not satisfy our design goals. Finally, multi-operation functionscaused by the game workload are inevitable as we do not intend to limit the scopeof effect of the players.Considering we found no key/value store to simultaneously provide all of ourrequired features, we implemented our own in Chapter 4. Innesto [116, 117] is astrongly consistent distributed key/value store. Innesto uses one-round transactions6to provide a data partition abstraction with range keys. Partitions are automaticallydistributed to balance load, while allowing different partitioning schemes of thesame data to co-exist for improved data retrieval. As part of its API, Innesto pro-vides searching for data on multiple attributes, supports atomic grouping of key/-value operations, and provides an extensible API to implement custom application-defined operations.1.6.3 Scalable Spatial Publish/SubscribeOn its own, Innesto provides half the functionality needed by an FPS game, namely,searching for objects with specific criteria. It lacks the other half: tracking themonce they’re found. In Chapter 5, SPEX [103] combines spatial partitioning pro-vided by Innesto with a hierarchy-based publish/subscribe mechanism to supportspatial publish/subscribe (SPS) [86].SPS is the minimal abstraction that allows decoupling game logic from theinfrastructure hosting it [86]. In essence SPS requires the ability to publish orsubscribe to any region of a virtual environment. By definition, SPS can’t toleratebeing limited to a single partition. With the aid of one-round transactions, SPEXensures data consistency is preserved for interactions that span more than a singleobject. SPEX is the first fully distributed strongly consistent SPS system and canscale to hundreds of players in a single virtual environment. In our experiments wehave reached a scale of 750 players as permitted by our hardware resources.1.6.4 Partitioned SQLAnother type of application whose data isn’t trivially partitioned are RelationalDatabase Management Systems (RDBMS). The tight integration between the com-ponents of a traditional RDBMS makes it extremely hard to distribute. This hasoften required redesigning the entire RDBMS and resulted in sacrificing parts ofits SQL capabilities [13, 29, 43, 51]. SQL transactions could potentially touch anytable in arbitrary order. Once again, the application has to either limit what thetransactions can touch to a single partition, or support SQL to its full extent anddeal with the consequences. As before, we take the latter approach.Chapter 6 presents iEngine [119,125] which is a full-SQL compliant distributed7RDBMS. Data is stored inside Innesto for efficient indexing and multi-attributerange queries. An enhanced version of Innesto designed for distributed lockingis used for isolating database transactions using one-round transactions. Queryplanning and transaction execution is done using unmodified centralized RDBMSs(e.g., mySQL) which are given access to the data through a standard storage in-terface. While Innesto scales out to provide higher storage throughput, iEngineenables a new dimension of scalability by allowing the application to plug in anynumber of RDBMS engines for increased data processing throughput.1.7 RoadmapWe deem data consistency in distributed systems as a problem that originates atthe transport level and escalates up through the application stack. When the stackis deep enough, consistency becomes too big a problem to deal with its complex-ity. We provide a background on data consistency in Chapter 2. Our solution is toprovide a means of dealing with consistency at the transport level with one-roundtransactions which allow turning consistency on and off, explained in Chapter 3.While one-round transactions aren’t anything new, we use them to take a head-onapproach at scaling applications which can’t be trivially partitioned. Innesto is thecomponent that uses one-round transactions to abstract away partitions and bound-aries, relieving upper layers from having to deal with crossing them (Chapter 4).In Chapter 5 SPEX uses this to safely implement area publications and subscrip-tions, knowing Innesto will correctly handle ordering of events. Finally, in Chapter6, iEngine takes advantage of the boundary-less abstraction provided by Innesto togive a single-node RDBMS the familiar feel of being in control of the data it stores.The two applications built in this thesis each demonstrate a different key featureof one-round transactions. The techniques used can more or less be applied to situ-ations where more than a single processing node is needed. iEngine represents thesimplicity of porting a single-node application to a distributed setting with minimalchanges. The composability of transactions allowed us to glue together operationssuch as committing a transaction and write-ahead logging. SPEX presents howstrong consistency can easily be enforced in an unpartitionable application such asFPS games.8Chapter 2Background & Motivation2.1 ConsistencyIn distributed systems, multiple threads of execution operate on the state of theapplication. The threads could belong to different processes on different machines.The data they touch may also be hosted on other machines, rendering low levelsynchronization primitives such as locks and semaphores useless. A thread hasto take one or more steps to complete each logical application task. We call thegroup of operations done by a single thread to complete each task a transaction. Atransaction may involve any or all of reading data, performing computations andwriting data in arbitrary order.With multiple concurrent transactions operating on the same data, based onhow the transactions are executed, different data consistency models are possible.The data consistency model defines what different concurrent transactions observewhile executing. For example, if multiple different transactions read the same re-cently modified data item, will they all see the same newest value or will sometransactions observe the old value? If a single transaction reads an item multipletimes, will it consistently observe the same value or could it possibly see differentvalues as a result of execution of other transactions? Finally, if two transactionsmodify the same data item at the same time, which one will win (i.e., make the finalmodification)? Table 2.1 lists a few anomalies possible in concurrent distributedsystems.9Table 2.1: Possible anomalies of inconsistent systems [110].Anomaly DescriptionDirty read A transaction reads an update made by another un-finished transaction.Non-repeatable read A transaction performs multiple reads on the samedata but gets different results due to concurrent mod-ifications.Lost update Two transactions make concurrent updates to thesame data and the results of one are overwritten bythe other.Conflicting fork Two concurrent transactions make conflicting up-dates causing the state to fork and require applica-tion logic to resolve.Table 2.2: Order of operations a, b and c observable by every thread underdifferent consistency models (assuming b causally depends on a).Consistency Model Possible ObservationsEventual Any of a → b → c or b → c → a or c → a → b ora → c → b or c → b → a or b → a → cCausal Any of: a → b → c or a → c → b or c → a → bSequential One of: a → b → c or b → c → a or c → a → b ora → c → b or c → b → a or b → a → cSerializable a → b → cVarious consistency models exist which are defined by how different threads ina system observe the results of operations executed by others. We present a simpleexample in which three operations a, b and c happen in a distributed system aroundthe same time and in that exact order. Also assume b causally depends on a, i.e., ithas read the result of a to perform its computations. Table 2.2 presents the possibleobservable orderings under different consistency models.In the weakest form, with eventual consistency [123], any ordering is observ-able for the operations. This means some thread might observe a computation (i.e.,b) using some computation it hasn’t seen yet (i.e., a). In a stronger model, causal10consistency [11] preserves the order of causally dependent operations. Hence, nothread ever observes b before observing a. However, it is up to the application toensure dependencies are correctly conveyed to the component that provides causalconsistency. With sequential consistency [35], all threads observe the exact sameordering of events globally, but it might differ from the actual ordering that hap-pened in real life. Only with serializability [36] (strong consistency) it is the casethat all threads observe the exact same order in which the events actually occurred.2.1.1 Why Strong Consistency?From a programmer’s perspective using a distributed system, strong consistency iseasy to deal with. It hides away the complexities of the distributed system as if thesystem is single threaded. The developer only needs to focus on the task at handwithout requiring in-depth knowledege of implementation details of the system.Invariants are easily maintained without requiring full knowledge of what they are.Without consistency guarantees, extra effort is required to ensure the applica-tion functions properly. This requires detailed a-priori knowledge about the ap-plication state and how it is modified. Regardless of the transport protocol used,requests in the system may be re-ordered at any point. A write to a data item fol-lowed by a read by the same thread on the exact same item may be executed out oforder. As a result, the thread may not read what it just wrote.Even if single event orderings are preserved, there is still no guarantee aboutorderings of groups of operations (transactions). Numerous interleavings of op-erations are possible which could result in drasticly different results for the exactsame transactions. There is no way for the developer to reason about concurrency.For example, when a read-modify-write transaction is executing, there is no wayto ensure the read data hasn’t changed while the modify and write operations werein progress.Such problems have forced cloud providers to enhance their component APIswith system support for dealing with some of these limitations. Amazon’s Sim-pleDB [6], an eventually consistent datastore, added a consistent read operationwhich returns a result that reflects all writes that received a successful response.They also added conditional put and conditional delete. These operations only ex-11ecute to completion if the data item has the specified expected value (the condition)or else they abort [7]. This helps protect items from concurrent modifications byrejecting modifications that were based on stale values.However, making each component in a distributed system consistent on itsown will not solve everything. A transaction that needs to operate on multipledata items simultaneously still faces the concurrent modification and interleavingproblems. It has no control on the ordering of operations on different machines.A conditional operation on one data item can ensure its value hasn’t changed, butsince the execution time of the other operations on the other machines is unknown,it can’t be sure the other values haven’t been modified.Finally, failures can happen at any time. Even if individual operations aredurably protected from failures, if the thread executing a transaction fails midway,it leaves the consistent components in an inconsistent state.2.1.2 ACIDUsing transactions to build services is nothing new. For decades RDBMSs haveused ACID [2] transactions to provide strong consistency:• Atomicity Operations in a transaction execute as all or nothing. A transac-tion only completes if all of its operations succeed. No intermediate state isever observable in the system.• Consistency A transaction takes the state of the system from one consistentstate to another. No kind of system failures should result in inconsistentstate.• Isolation Different concurrent transactions are isolated from each other asif they were executed serially, i.e., one after another. The effects of an in-complete transaction should not interfere with another transaction’s compu-tations.• Durability The effects of completed transactions will persist through anykind of outages and failures.12Unlike locks, transactions are composable. Two different operations built us-ing transactions can be combined together. By using the same transaction theyprovide correct concurrency isolation. Durability simplifies application develop-ment. All the developer has to do is ensure important state is stored and modifiedusing transactions. No extra failure scenarios have to be considered.Database transactions are a powerful tool, yet their implementation is ex-tremely complicated. Distributing them is an even more challenging task. It istheoretically impossible to globally serialize concurrent database transactions in alock-free manner. Scalable RDBMS that provide full-fledged ACID transactionsonly scale up by adding more expensive hardware with immense licensing costs.RDBMS solutions offered by cloud providers [4] are still single-node servers.To scale out, an application has to use multiple different instances of the RDBMS,each one for a different purpose. Consequently, the transactions live in differentworlds, each used for building different individual services. They can’t be usedtogether to build the entire system. Such limitations have forced applications to usea hybrid architecture of using transactions in the back-end for important applicationtasks and resort to eventual consistency in the front-end [25].2.2 Transport-level ACIDThe benefits of ACID transactions are too important to ignore when building dis-tributed systems. Thus we take a different approach. Instead of trying to distributetraditional ACID database transactions, we simplify them down to one-round trans-actions provided at the transport level. The rationale is that a transport capable ofproviding some form of ACID transactions at scale with reasonable performancecould serve as the foundation of scalable large-scale applications.2.2.1 One-Round TransactionsDatabase transactions can have multiple rounds of reading and writing differentdata items between start and commit. The multiple rounds make it impossible toserialize them in a lock-free manner as there is no generic way to predict whatthe transaction will do next. One-round transactions only allow a single round ofcommunication initiated by commit. There is no limitation on the number of items13that can be read or written by the transaction. The only limit is that they need to beknown at commit time.Compared to using an ordinary transport, the application using one-roundtransactions is exposed to the same API and message receiving handler (invokedon receipt of each message). What’s added is another handler, invoked before thereceiving handler, used for checking consistency requirements. The applicationcan use this handler to define its consistency rules, or simply ignore it to forfeitconsistency provided by one-round transactions.The simplification of transactions provides these benefits:• Less-lock/Lock-less serialization In the worst case, committing a one-round transaction takes 2 network round trip times (RTT) (plus a durablelog time or extra RTT for replication) to complete. If locking is used to dealwith concurrency, locks will generally be held for a short time, providingbetter performance under contention than long-lived locks.One-round transactions can also be isolated by using clock vectors for lock-free serialization. Without locks, none of the overheads of synchronizedexecution, lock failures and random back-off are present. Thus one-roundtransactions can perform well under extreme contention (see Chapters 3 and4).• Message-passing fallback While transactions may be a suitable solution forcertain components of a system, not all components might be interested inthe extra overhead of the commit protocol and durability. Fortunately, thecommit protocol executes in a single RTT under certain conditions. Theapplication is given the freedom to choose the right balance of performance,durability and consistency.2.2.2 Cloud Unfriendly Applications: A Different ApproachOur new transport based on one-round transactions provides consistency with thedesired level of performance. This allows us to use one-round transactions to im-plement the needed components for our target applications.14Online FPS GamesFPS games are a popular genre where the player observes a virtual environment(VE) from the eyes of an avatar they control. Online FPS games use frequent mes-sage passing between different replicas of the VE to synchronize what the playersobserve. For example, in a server/client architecture, all players send their com-mands to the server and the server keeps them all up to date with the latest stateof the game via frequent update messages. Previous studies have found the enjoy-ability of the game to be very sensitive to end-to-end latencies. This is the delaybetween when one player takes an action until the time it is observed by otherplayers in the game. Latencies of 150ms represent the border of enjoyability [67].Anything more than that will drastically degrade the quality of the game.Online FPS games present an interesting class of application. The state of theVE is shared between all players in the game. It consists of the objects in the VE,including player avatars. The actions of a player are translated into interactionsbetween the avatar of the player and its target objects, such as other players. Playersare continuously moving and interacting with the VE during the game. As a result,the workload of an online FPS game is heavily write-oriented. This is in contrast toother types of online application whose workloads usually have a high read/writeratio (sometimes as high as 30 to 1 [50]).The game is sensitive to end-to-end latencies. In many scenarios traditionaltechniques for dealing with concurrency, e.g., locks, and data retrieval, e.g., rangequeries, will not provide the desired performance. The players controlling avatarsare manipulating the objects remotely, usually over WAN distances. What a playerobserves when taking an action will differ from the latest state of the VE on theserver due to propagation delay. Inconsistencies need to be dealt with correctly andfairly.The players have unpredictable interactions with other objects. Unlike mostonline applications that can serve users from the nearest replica, geographic prox-imity has no meaning in online FPS games. Even in the VE players don’t nec-essarily interact with players virtually closest to them. Various factors contributeto how players select their targets [55]. Interactions are also restricted based onsystem imposed policies, such as constraints in a physics simulation system. The15interactions are not only unpredictable, but also vary over time as the active battlegrounds shift [99]. This prevents a clean partitioning scheme for the state of theVE.Online gaming is a billion dollar industry. Players invest months of their timebuilding large armies to live the thrill and excitement of participating in battle.World of Warcraft [46] has reported over 5 million monthly subscribers [124].EveOnline [17] has recently reported over $300,000 USD of in-game ships andequipment destroyed in a single 21-hour battle [39].Existing state-of-the-art systems do not yet live up to the full potential of epic-scale gaming (see Chapter 5 for a survey). Sharded systems, e.g., World of War-craft, limit the interactions of players to a subset of others. Partitioned systems,e.g., Second Life [33], limit the interactions of players to their immediate sur-roundings in the VE. Systems that suffer from neither, e.g., EveOnline, sacrificeconsistency. The players currently face one of two options. Either participate ina large yet inconsistent VE and risk their assets on good faith, hoping they’ll endup on the good side of concurrency conflicts. Or choose to settle for a smallermore limited battle with fewer players. Overcoming these limitations will not onlybroaden the horizon of FPS gaming entertainment, but would also make way forlarge scale sports activities and social exhibits.Beyond games, the model of the system is generic enough to be extended toother domains. Imagine replacing the game client with a browser, game objectswith user profiles, and physics restrictions with friendship connections to form areal-time social networking service, one that instead of dealing with a soul-lesswall, users will more dynamically interact with their friends. Thus solutions pre-sented for epic-scale FPS games may be used for other purposes.DatabasesFor decades databases have served as the data storage component of computer sys-tems and applications. Years of research have perfected the tasks of data layout,query planning and synchronization while providing SQL as the universal languagefor accessing the data. SQL simplifies interactions with the data as it allows a verybroad set of possible questions to be asked [45]. It has enabled an ecosystem16of management and operator tools to help design, monitor, inspect, explore, andbuild applications for enterprises. A SQL programmer can reuse their API and UIknowledge across multiple back-end systems which reduces application develop-ment time.Traditional relational databases, including those from Oracle and IBM, are builtwith a share-everything centralized topology. This results in tight coupling of inter-nal components. This doesn’t mean they don’t scale. But rather, when it comes toscalability they scale up. Throughput is only improved by adding more expensivehardware with immense licensing costs, as opposed to scaling out by combiningmultiple cheaper hardware units to achieve the same result.Finding distributed alternatives to mathematically proven properties of central-ized SQL engines is a non-trivial task. Systems with similar goals have ended upsacrificing some part of SQL along the way (see Chapter 6). We provide consis-tency at the transport level, therefore we take a different approach in trying to scaleout a database. We leave the engine intact, but allow simultaneous engines to co-exist in the system and take care of correct synchronization ourselves. As a result,we provide full SQL while allowing a new dimension of scalability.17Chapter 3One-Round TransactionsOne-round transactions use two-phase commit to communicate with multipleservers in parallel. They are designed to complete in the shortest amount of timetheoretically possible, which is two network round-trip times (RTTs)1 [49,72]. Theterm one-round [72] is the fundamental property that distinguishes them from tra-ditional database transactions which allow reading and modifying data in multiplerounds of communication during execution. A one-round transaction has 2 keyproperties: (1) all data items referenced by it should be known when executionstarts, and (2) execution will run to completion, i.e, will not abort for applicationdefined reasons or any form of failures. Any number of transactions can run simul-taneously for increased throughput. Under certain conditions where a transactiononly reads data from the servers or only touches data stored on a single server,one-round transactions can commit in a single RTT.Applications can extend the one-round transaction framework to implementany kind of custom transactions. The application uses the client library to create,populate and commit transactions which internally communicates with the serversstoring the data. Servers only respond to requests that follow the commit protocol.The application has to implement its own server handlers to deal with the opera-tions it uses. The framework handles committing each transaction to completion.Two different isolation modes are provided, one that uses short-lived locks and1Assuming each application message is sent to the receiver in one RTT which is generally true insteady state communications under normal network conditions18Listing 3.1: Transaction Client API/ / Create a new t r a n s a c t i o n .Tx new tx ( ) ;/ / Add a new item wi th i d ’ dataId ’ on server ’ server Id ’ to the t r a n s a c t i o n ./ / ’ op ’ s p e c i f i e s the app−def ined opera t i on t h a t should be performed on the/ / data i tem . ’ readOnly ’ s p e c i f i e s i f ’ op ’ w i l l modi fy the i tem or not .t x add i tem (Tx tx , Id server Id , Id dataId , Operat ion op , Data data , bool readOnly ) ;/ / Try to commit the t r a n s a c t i o n and r e t u r n commit success r e s u l t . F i l l s/ / ’ r e s u l t ’ w i th i tems returned by the t r a n s a c t i o n .bool commit tx ( Tx tx , out L i s t r e s u l t ) ;a lock-free one which uses clock vectors, each useful under different scenarios.As part of the commit protocol, state modifications are also protected from fail-ures. In the event of a failure, a separate tool known as the recovery manager isrequired to clean up half-committed transactions. Parts of the data may becomeunavailable due to failures and while recovery is in progress, but it will never bein an inconsistent state. In the remainder of this chapter we present details of one-round transactions. For simplicity, from now on we use the word transaction whenreferring to one-round transactions.3.1 APIAn application trying to use transactions is exposed to a client library API and aserver interface. The client API is used for creating and committing transactions.The server interface has to be overloaded to provide the necessary functionality ofthe server.3.1.1 ClientThe client API can be found in Listing 3.1. Each client library instance is assigneda unique client identifier. When a transaction is created, it is assigned a globallyunique transaction identifier. The transaction id is produced as the concatenationof the client id and a monotonically increasing counter.A data item is referenced by a tuple specifying the server id which stores theitem and the item’s unique id. Unlike distributed hash tables [112] where the server19id of an item is inferred from its id and the network topology, we allow items tofreely migrate between servers for load balancing purposes. The application canuse transactions to perform an operation on any item by simply adding it to theiritems. When adding an item, the application also has to specify if it might bemodifying it. This information is used to improve the commit performance of thetransaction.Commit either fails or succeeds, returning a custom list of messages providedby the servers in response to the transaction. During commit transient problems(such as lock conflicts and server failures) are handled internally. Commit failuresonly happen when the transaction encounters a permanent problem which cannotbe resolved without help from the application, e.g., concurrently modified data.The result list of the transaction includes reasons as to why commit failed uponfailure, or computation results in the case of success.3.1.2 ServerThe server functionality is extended by overloading two handlers as part of theserver interface (Listing 3.2). The handlers are invoked for each transaction andgive the application full control over them.The first handler, prepare, asks the application to validate the assumptions ofthe transaction to ensure it can proceed. This involves checking the existence ofitems referenced by the transaction and blocking the assumptions from changinguntil the transaction is done when required. At the end, the application decides ifthe transaction can proceed, has failed due to a permanent problem, or encountereda temporary problem and should be retried.The second handler, finalize, is invoked once the final outcome of a transactionhas been decided with regard to all the involved servers. The handler notifies theapplication to proceed with applying the changes of the transaction to the state orto rollback and abort.3.1.3 Sample TransactionListing 3.3 illustrates a sample transaction that operates on three items stored ontwo different servers. The transaction performs the custom defined operations20Listing 3.2: Transaction Server Upcalls/ / Check to ensure the t r a n s a c t i o n i s ope ra t i ng on v a l i d ’ i tems ’ . Return/ / INVALID i f any of the i tems are i n v a l i d , FAIL i f there i s a concurrent c o n f l i c t/ / w i th another t r a n s a c t i o n . Else r e t u r n OK and locks the i tem i f necessary ./ / Anything t h a t should be returned are s tored i n s i d e ’ r e s u l t ’ .VoteType prepare ( i n t64 tx Id , L i s t i tems , out L i s t r e s u l t ) ;/ / F i n a l i z e execut ion of the t r a n s a c t i o n to e i t h e r proceed or abor t .f i n a l i z e ( i n t64 tx Id , L i s t i tems , bool proceed ) ;Listing 3.3: Sample Transactiont x = new tx ( ) ;t x add i tem ( tx , server1 , i tem1 , WRITE, ’ Data to s tore ’ , f a l s e ) ;t x add i tem ( tx , server2 , i tem2 , READ, Nul l , t rue ) ;t x add i tem ( tx , server2 , i tem3 , COMPARE, ’ Value to compare to ’ , t rue ) ;L i s t r e s u l t ;i f commit tx ( tx , r e s u l t ) == t rue :/ / Transac t ion succeeded ./ / r e s u l t [ i tem2 ] conta ins contents read by the t r a n s a c t i o n .e lse :/ / Transac t ion f a i l e d .i f r e s u l t [ i tem2 ] == INVALID or r e s u l t [ i tem3 ] == INVALID :/ / ’ i tem2 ’ or ’ i tem3 ’ no longer e x i s t .e lse :/ / Compare f a i l e d on ’ i tem3 ’ which has a d i f f e r e n t value .WRITE to write the given data to the item, READ to read the contents of the itemand COMPARE to compare the provided value against the current value of the item.Once commit is initiated, the outcome of the transaction is returned. This trans-action can only fail under two scenarios. Either one or more of the referenced itemsdo not exist, or the comparison of the third item failed against the latest value on theserver. Each failure would trigger a specific application response and can be differ-entiated from each other by using the result messages returned from the servers. Ifthe transaction commits successfully the modifications, in this case writing to theitem, are applied and the result will include valid data read from the servers.The server-side implementation of the sample transaction using locks can befound in Listing 3.4. In the prepare upcall, the server first checks for invalid itemreferences and locking conflicts. Then it proceeds to lock items to ensure no otherconcurrent transaction can invalidate this transaction’s assumptions. Finally, re-quested data is piggy-backed to the client. Locks are released in the finalize upcall21Listing 3.4: Server-side Implementation of the Sample Transaction withLocksVoteType prepare ( i n t64 tx Id , L i s t i tems , out L i s t r e s u l t ) {/ / Check f o r i n v a l i d i tems or l ock c o n f l i c t s .foreach ( i tem i n i tems ) :i f not i tem i n s to red i tems :r e s u l t [ i tem ] = INVALID ;r e t u r n FAIL ;i f locked ( i tem ) :r e t u r n ABORT;/ / Lock i tems f o r t h i s t r a n s a c t i o n and perform p r e l i m i n a r y ac t i ons .foreach ( item , op i n i tems ) :l ock ( i tem , t x I d ) ;swi tch op :case WRITE:break ;case READ:r e s u l t [ i tem ] = s to red i tems [ i tem ] ;break ;case COMPARE:i f s to red i tems [ i tem ] != i tem :r e t u r n FAIL ;break ;r e t u r n OK;}f i n a l i z e ( i n t64 tx Id , L i s t i tems , bool proceed ) {i f proceed == t rue :foreach ( item , op i n i tems ) :swi tch op :case WRITE:s to red i tems [ i tem ] = i tem ;break ;case READ:case COMPARE:break ;/ / Unlock i tems i f locked f o r t h i s t r a n s a c t i o n .foreach ( item , op i n i tems ) :i f t x I d == lock owner ( i tem ) :unlock ( item , t x I d ) ;}22and if the outcome is to proceed, modifications are made to the stored data.3.2 Commit ProtocolThe client trying to commit a one-round transaction is the Coordinator, and eachserver whose id is included in the items of the transaction is a Participant. To com-mit, the Coordinator sends the transaction to each Participant and each Participantvotes to either proceed or not. The outcome of the transaction is then decided bythe aggregation of all the votes; if all Participants vote to proceed, the outcomeis to proceed, else the outcome is to abort. Once decided, the Participants final-ize execution of the transaction. If the final outcome is to proceed, modificationsrequested by the transaction are applied to the stored data. Otherwise, they aresimply discarded.In cases where the vote can be inferred by all Participants without an explicitmessage from the Coordinator, the commit protocol completes in a single RTT.Specifically, when a transaction only touches data on a single server, or the trans-action only performs read operations, there is no need for the extra round of com-munication.At any given time multiple transactions may be executing concurrently whichshould be isolated from each other to prevent race conditions. Isolation happensusing two different methods that vary in the details of voting and when a transactionis run by each Participant and the upcalls are dispatched. However, each servercould be servicing both kinds of transactions at the same time. Figure 3.1 illustratesthe messages exchanged during commit, and Table 3.1 contains the description ofthe messages.3.2.1 Lock ModeIn this mode, logical locks are used for isolation (Figure 3.1a). The core voting andlocking protocols used in lock mode are similar to Sinfonia [49] with the additionof custom operations via server upcalls.The Coordinator sends a PREPARE message along with txId, items and ro tothe participants which invokes the prepare upcall. Each participant validates allitems referenced in items to exist and to be unlocked. If there are invalid items23(a) Lock Mode(b) Clock Mode 1 (c) Clock Mode 2Figure 3.1: Transaction commit protocol.24Table 3.1: Description of message fields used in the commit protocol.Name DescriptiontxId Globally unique transaction Id.items List of items in the transaction intended for the recipient Partici-pant. Each item has a serverId specifying the target Participant,itemId uniquely specifying the target item, operation defining thecustom operation that needs to be done, and readOnly flag indi-cating if operation will modify the item’s content or Flag specifying if the transaction only consists of read-only The vote of a participant which could either be: OK, FAIL orRETRY.results List of messages sent from the Participants to the Coordinator.outcome Boolean flag describing the outcome of commit.timestamp Virtual time at which the transaction is executed by all Partici-pants.clientTs Virtual clock maintained by each client.itemTs Virtual clock maintained by a server per item.proposedTs Virtual execution time of a transaction proposed by a Participant.referenced, vote is to FAIL, else if there is a locking conflict, vote is to RETRY.Otherwise, items are locked and read only operations are executed with their resultsstored in results. If any operation fails, vote is FAIL, else the Participant votes OK.Once the Coordinator receives all votes, it can decide the outcome of the trans-action. Commit fails if any Participant votes FAIL or RETRY. The latter case beingnecessary for preventing deadlocks for which commit is retried after waiting a ran-dom amount of time with exponentially increasing value. Only if all Participantsvote OK will commit succeed. The Coordinator sends a FINALIZE message withtxId and outcome to the Participants (invoking the finalize upcall), and notifies theapplication unless commit has to be retried. In the finalize upcall, write items areapplied and locks are released.3.2.2 Clock ModeIn this mode, inconsistencies caused by race conditions are avoided by having allservers execute transactions in the exact same serial order. To achieve serializabil-25ity, transactions are executed on all servers using a globally unique order whichis decided using clock vectors [72]. Hence, there is no need to lock data items.The clock vector protocols were taken from Granola [72] where synchronizationgranularity was increased to per-item clocks for improved performance.In clock mode, a transaction commits at a specific timestamp, a virtual execu-tion time. Each client maintains its own clock, clientTs, as the highest timestampof any committed transaction it has observed so far. Upon commit, the Coordinatorsends a PREPARE message along with txId, items, ro and clientTs. The participantsinvoke the upcall and the application returns vote. Additionally, each Participantproposes a timestamp proposedTs for the transaction as MAX(clientTs, ∀ itemTs)+ 1.In the base protocol (Figure 3.1b), instead of contacting the Coordinator withtheir votes, the Participants directly send txId, vote and proposedTs to each other.Each time a Participant receives a vote, it sets proposedTs to the maximum of whatit has and what it receives. This ensures all Participants will eventually agree onthe same final timestamp for the transaction.Per item, Participants maintain a sorted list of transactions in increasing orderof proposed timestamps. TxId is used as the tie breaker. Transactions are thenexecuted in-order. A participant never invokes the finalize upcall on an item untilall votes have been received. This ensures the exact same execution ordering ofdependent transactions by all Participants. Once a Participant finishes executing atransaction, it sends a notification to the Coordinator which includes txId and times-tamp. When the Coordinator receives completion notification from all Participants,it updates it clientTs to be greater or equal to timestamp.The base protocol in clock mode has the same complexity as lock mode, i.e.,1.5 RTTs, but the message complexity grows quadratically with the number ofParticipants. This causes trouble in cases where a transaction may touch itemson many servers. The modified commit protocol in Figure 3.1c offloads decidingthe timestamp of the transaction to the Coordinator with an added extra half RTT(same as lock mode).263.2.3 Lock vs. ClockEach of the different isolation modes has an advantage over the other in certain sce-narios. If multiple transactions simultaneously access the same data items, clockmode avoids three sources of latency present in lock mode. First, as there are nolock conflicts, commit never has to retry the transaction. This avoids extra RTTsfor the retry and saves wasted CPU and bandwidth resources. It also doesn’t cre-ate extra contention on other items in the transaction which have to constantly belocked and freed due to locking failures on another server. Second, as a result of noretries, there is no exponential back-off and idle periods for the transaction. Third,execution of operations on different servers isn’t synchronized, avoiding wait peri-ods for other servers to catch up.Clock mode performs better under extreme contention compared to lock mode,but imposes limitations on the types of operations that may be performed whenthe transaction touches multiple items. In lock mode all items are locked beforevoting. This means they will not change until the transaction is done executing.In clock mode, there is a window of time between when the vote is issued untilwhen the transaction is done where the items may be modified. Extra care shouldbe taken by the application developer when designing their operations to ensureno consistency requirements are violated. They should be aware of this limitationwhen choosing the right isolation mode for their application.We found a hybrid of lock-clock to be very useful, in which isolation is doneusing clock vectors but we still use locks for certain items to ensure correctness.We use this technique in Innesto (Chapter 4). The insight here is that for perfor-mance reasons data items aren’t locked by using clock mode, while item containers(partitions) are locked for preventing items from disappearing during commit.3.3 Durability & Failure RecoveryAs part of the one-round transaction commit protocol, a Participant makes a stablelog of important information about the transaction before issuing its vote. Loggingcan be either done synchronously to disk or by replicating the transaction to backupservers to avoid disk access. In lock mode, if vote is OK a Participant logs txId, listof Participants, and items that don’t have their readOnly flag set (i.e., write items).27In clock mode, it additionally logs proposedTs as well.If a Participant fails, its dedicated log can be used by a new server to reconstructthe failed Participant. If the Coordinator fails, a Participant can directly commu-nicate with other Participants to finalize lingering transactions. While recoveryis in progress, parts of the data affected by the failures will become unavailable.This is the price paid for consistency based on the CAP theorem [84]. Beyond thispoint we can assume data modified via transactions is fully reliable, recoverableand always consistent.28Chapter 4Innesto4.1 IntroductionKey/value datastores serve as the backbone of scalable services and host hundredsof millions of data items and simultaneously service millions of clients. Currentkey/value datastores, such as Dynamo [76], Cassandra [94] and BigTable [65],scale across a large number of servers and provide high availability guaranteesunder extreme failure scenarios, such as massive network outages or hardwarefailures [76]. Relaxing the data model to provide eventual consistency througha simplified API that allows accessing data via put, get or remove individually havecontributed as major features enabling such scalability and high availability.While key/value stores are essential to online cloud-based applications, tradingout everything in favour of scalability and availability does not present an appealingsolution for all applications dealing with big data. This has caused a general lackof interest from enterprises in the NoSQL paradigm [113]. The problem is causedby three important limitations in the design of traditional key/value stores:• Data consistency The eventual consistency data model implies that in a dis-tributed system if no new updates are made to a given data item, eventuallyall accesses to that item will return the last updated value [123]. However,there is a window of time after an item is updated that different reads on thesame item will return different values. This pushes unwanted complexity up29to the application. The developers have a hard time reasoning about datafreshness uncertainty which could lead to correctness issues in applicationlogic.• API The narrow API to put, get and remove a data item based on a uniquekey is insufficient for an application to freely express how data should beretrieved. Applications require accessing the data based on secondary at-tributes. This forces developers to use various techniques when modellingtheir data to be able to retrieve items by means other than their key [38].1To this end, commercial cloud service providers have developed their ownlanaguages which provide a means of finding data using queries [10, 21].• Concurrency ACID (i.e., Atomicity, Consistency, Isolation, Durability)properties mandate that read/write operations may be grouped into transac-tions. Operations in a transaction are executed together while taking the datafrom one valid state to another. The overall outcome of executing multipleconcurrent transactions would appear as if they were executed serially oneby one and no form of system failures should violate these properties. In asystem that allows multiple concurrent read/write operations on data, systemsupport is required to correctly deal with concurrency. This feature is miss-ing from the basic key/value API of traditional systems. Without some formof transactions (no matter how primitive), application developers can’t dealwith race conditions and concurrency.In related work, numerous improvements have been made to amend the so-called first generation key/value datastores by extending their API to include sim-ple range searches [69], support conditional operations as a primitive form of trans-actions [6, 105], or provide a stronger data consistency model [108]. However,the largely neglected missing feature from the first generation is a high degreeof expressibility when dealing with data retrieval. In the age of big data, recom-mendation systems process massive amounts of data through numerous differentattributes to provide per-user customizations. Such applications need to be able1Interestingly, when given the choice, developers opt for a semi-relational storage with bad writeperformance (Megastore) over a scalable eventually-consistent key/value datastore with superior per-formance (BigTable) [71].30to freely reference data items via various attributes in the form of multi-attributerange searches, which is why they are heavily coupled with the relational SQL datamodel due to its flexible queries. There are simply no direct mappings from suchwell understood OLTP tasks to the basic key/value API, calling for a new genera-tion of key/value datastores. HyperDex [78] marks the birth of second-generationdatastores which natively supports search as part of its API. However, HyperDexis not architected for data with high dimensionality.Our goal is to address the multi-attribute range search issue on highly dimen-sional data in a scalable fashion to help bridge the gap between the feature set oftraditional key/value stores and the performance limitations of relational databases.We thus present Innesto, a searchable key/value datastore for data with high dimen-sionality. Innesto overcomes all three limitations of traditional key/value stores. Itprovides a strong data consistency model. Thus, application developers can as-sume Innesto is a single server single threaded storage system, while in fact it isfully distributed and scalable. The API is richer than that in traditional key/valuestores; Innesto supports multi-attribute range search on any number of secondaryattributes of data items. It also provides ACID transactions for grouping operationsto deal with concurrency.Innesto uses a combination of old and new techniques. Efficient range searchis supported by using spatial partitioning [31] and maintaining a hierarchy [24].Various different types of range search on different attributes are supported byusing a set of parallel clones of the data [78], where each clone is optimized for aspecial set of range requests. Internally, Innesto uses high-performance one-roundtransactions to consistently update the data and its clones in parallel. One-roundtransactions execute client requests in a consistent and fault-tolerant manner, evenin the face of group server failures. Concurrent operations are isolated from eachother by either using short-lived locks on data items or through clock vectors [72],depending on the access patterns of the application. To avoid the imbalance dueto spatial partitioning, Innesto dynamically partitions the data and performs onlineload balancing by restructuring partitions. Each operation is carefully designed toexecute in the least amount of time possible. Multi-versioning is used to reduce theoverhead of supporting parallel clones to efficiently answer more types of queries.We evaluate our implementation of Innesto using an industrial cloud bench-31Table 4.1: Innesto’s API which includes basic key/value operations plus search.Op Args Return Descriptionput table, key, attribs, value - Insert value associated with keyget table, key value Get value associated with keyremove table, key - Remove value associated with keysearch table, req attribs list¡value¿ Get all values satisfying req attribsmark and compare its performance with Cassandra [94], a scalable widely usedkey/value datastore, and show it has competitive performance while offering a myr-iad of extra features. Our results also show Innesto outperforms HyperDex [78] inevery aspect. In Section 4.2 we present an overview of Innesto. Sections 4.3 &4.4 elaborate on how key/value operations and multi-attribute search are executed,while Section 4.5 presents Innesto’s extended API. Evaluation results are in 4.6and a review of related work can be found in Section 4.7. We conclude in Section4.8.4.2 Overview4.2.1 Data Model & APIInnesto is a key/value datastore designed to extend the feature set of a normaldatastore while maintaining scalability. The goal is to allow new types of applica-tions that cannot tolerate the limitations of key/value stores to migrate to the cloud,while existing applications can be extended using the superior features offered byInnesto.A data item has an immutable key, a set of typed secondary attributes and avalue with arbitrary size. Innesto uses a table abstraction to logically group dataitems together using an application-defined schema. The schema sets the numberand types of the secondary attributes. An attribute can have any data type thathas a definable strict ordering of values. This abstraction allows for an effortlessmigration from existing systems that rely on traditional databases to Innesto, whileother applications can use a single table representative of the global namespace32Figure 4.1: Logical architecture of Innesto components. Each component isfully distributed.offered by normal key/value datastores.Table 4.1 presents Innesto’s API. The basic API of a datastore uses an item’skey to put, get and remove the item in a specific table. For put, the item’s secondaryattributes and its value should also be provided. Innesto’s API also supports searchon secondary attributes. The user specifies constraints on the secondary attributes,such as an exact value or a given range, and Innesto will return a list of items withattributes satisfying the constraints. For example, a table storing items based onthe physical X , Y , and Z locations can search for items within a specific physicalregion, or even items that are at most a specific distance away from a given pointin the 3D space.4.2.2 ArchitectureFigure 4.1 illustrates the logical components of Innesto. Each component is fullydistributed between Innesto servers which can simultaneously act as multiple dif-ferent components. Client requests are served by the Proxy which can scale upand down by spawning new Proxies to accommodate current demand. Each proxy33is responsible for identifying the set of servers that need to be contacted to com-plete each operation. It then communicates with the rest of the components usingone-round transactions and ensures each operation runs to completion, after whichit notifies the client. Data items are stored in the Multi-Version Storage (MVS)component and a number of Search Clones which are also distributed between allservers to accommodate load. MVS is the main repository for storing data, whilethe Search Clones are used for efficiently executing search requests. Each clone isoptimized for a specific type of search and requests are routed to the most suitableclone. We discuss MVS and Search Clones in Sections 4.3 & Strong ConsistencyUsing one-round transactions to execute key/value operations allows Innesto toprovide ACID (atomicity, consistency, isolation and durability), the strongest pos-sible level of data consistency. This means when the execution of an operationcompletes, any other subsequent operations completed after it will reflect its re-sults. For example, search returns a consistent snapshot of items, no matter whatclone it contacts.4.3 Multi-Version Storage (MVS)4.3.1 PartitioningMVS is a distributed storage system for data items. A full copy of each item isstored using its key. Data is distributed by spatially partitioning the key space.Each data item is versioned, where the version number is incremented each timethe item is modified with write operations (put and remove). When reading anitem (get), an explicit version of the item can be requested or the latest version isreturned. Adjustable parameters define the number of versions to keep and timeto live of each older version, which keep the overall overhead of multi-versioningbounded.Each table is distributed between servers into partitions as illustrated in Figure4.2. Note that only single dimensional partitioning is depicted, however, Innestosupports partitioning on any number of dimensions. A partition has specific bounds34Figure 4.2: Spatial partitioning and distribution of a table. The root parti-tion covers the entire key space and each partition keeps references tochild/parent partitions. Only leaf (gray) partitions store the actual data.Partitions are distributed between servers for load balancing.and can only store items with keys that fall within its bounds. Each table starts as asingle partition covering the entire key space. As the load of a partition increases, itis split into smaller partitions. The new child partitions each cover smaller parts ofthe parent’s key space and are placed on different servers to balance load. Under-utilized partitions are merged to reduce fragmentation. Each partition has a glob-ally unique ID with no strict server affinity. It can freely migrate between servers atany time. This allows us to decouple partitions from physical machines for betterload balancing. Partitions keep references to their parent and children. Thus byknowing how to find the root partition of a table, all other partitions of the tablecan be subsequently located.4.3.2 Non Structure-Modifying OperationsNormal key/value operations don’t directly change the structure of a table (i.e.,split/merge/migrate partitions). A single one-round transaction is used to executean operation which executes in one RTT since in MVS each operation touchesexactly one partition. The transaction execution protocol ensures proper isolation35from other concurrent operations so a partition can act as a simple single-serverkey/value store with a trivial implementation. For put the transaction includes theitem’s key, attributes and data, while for remove it just includes the key. For get theitem’s version is optionally provided with the key and its value is returned alongwith the server’s vote as part of the commit protocol.Each partition has a logical structure lock associated with it. Regardless of theisolation mode (i.e., lock mode or clock mode), a transaction temporarily blocksthe partition it is referencing from re-structuring by acquiring the structure lock onthe partition. This prevents the partition from splitting/merging/migrating while thetransaction is executing. If the partition cannot be locked, it means a re-structureis in progress and the operation is touching the wrong partition; if the partitionis splitting/merging, the transaction is referencing the wrong partition, if it’s mi-grating, the transaction is talking to the wrong server. Regardless, the transactionaborts and retries using the correct partition.4.3.3 Structure-Modifying OperationsThe structure of partitions is also altered using one-round transactions to be coher-ent with operations. Any structure-modifying operation has to acquire the structurelock on the partitions it touches before proceeding. This ensures partitions don’tsimply disappear while an operation is halfway through execution. The processof acquiring the structure lock waits for transactions in flight to finish, while de-laying new transactions from starting until the lock is released. Once the lockson the partitions are acquired, re-structuring begins. Since each partition has itsown structure lock, only small parts of a table become unavailable when being re-structured. All other operations touching other partitions proceed as they normallywould.Partitions are split based on load in order to distribute overall load betweenmore servers. Two factors contribute to a partition’s load. First is data congestionwhere the number of items stored in a partition exceeds a threshold. Second, iswhen the rate of transactions a partition serves exceeds its capacity. Since splitis triggered under heavy load, it is designed to complete in one RTT. A specialone-round transaction is used to create new child partitions with specific bounds,36populate them with the data they should hold and push them out to other servers.Strong consistency requires serialization of operations. If certain items tend tobecome over-popular, the load balancer will keep splitting the partitions hostingsuch items down to partitions holding a single hot item. This provides the finestgranularity of serialization possible while still maintaining consistency.Migrating a partition to another server is useful in various scenarios. For ex-ample, when a new server is spawned to scale the system, or when a server is tobe terminated to reduce operational costs, partitions need to be migrated betweenservers. Also, in the case where the load of each partition is below the threshold butthe aggregate load of all partitions exceeds the capacity of the underlying physicalhost, migrating some partitions can help balance load. Migrating a partition usesa one-round transaction to move data off one server onto another, and simultane-ously fix parent/child references. Thus migrate executes in two RTTs. Merge isto reduce fragmentation by combining under-utilized partitions into one. It takes acouple of transactions to trigger and execute merge.4.3.4 Partition HierarchyThe hierarchy formed by the partitions of each table allow every server to be able tolocate all the partitions belonging to that table by only knowing where the root is.This hierarchy is used to efficiently compute the servers that need to be contactedper operation. However, in a large system with billions of items stored in tensof thousands of partitions spread across hundreds of servers, it is not efficient nornecessary for every server to know where each partition is exactly located. It needonly be able to locate one when required.To prevent constant network accesses each time a server tries to execute anoperation, the partition hierarchy of each table is lazily cached by a server as soonas it initiates its first operation on the table. This provides the servers with itemlocations on a need-to-know basis and speeds up operation execution. Thus at anygiven time, no server has to have a global view of the partition hierarchy.Over time, parts of a server’s cache will go out of sync with the actual hierarchydue to structure modifications. This will cause operations using stale cache entriesto reference the wrong partitions or servers. To prevent this, the Participants of37Figure 4.3: Search Clones are used to partition data based on secondary at-tributes. Items are grouped into attribute groups to better support rangesearch. Each Ki is a full copy of an item.each one-round transaction validate that the transaction was initiated using the rightpartition hierarchy. If not, the operation is rejected and the Coordinator is notifiedby a special message piggy-backed on the vote. The Coordinator will then backtrack the steps it took when executing the operation to find the stale parts of itscache and re-synchronize them. Lazy caching allows structure modifications to bespaced through time when being propagated to the servers, avoiding an immediatebroadcast to hundreds of machines upon each change. This significantly reducesthe network costs of structure modifications.4.4 Search4.4.1 Search ClonesUnlike normal key/value operations that use keys to uniquely identify data items,search is required to allow the caller to specify some constraints on secondary at-tributes and should return all data items in a table that satisfy the given constraints.A constraint could be a specific value on the attribute, a given range specified byminimum and maximum values, or ∗ which means to search the full possible range38of the attribute.Items are stored in MVS based on their key which is oblivious to secondaryattributes. To provide search on a table, Innesto creates parallel Search Clones [78].Each Clone is a separate copy of the entire table partitioned differently based ona subset of secondary attributes. In our example of storing items with secondaryattributes equal to their physical X , Y and Z position, a Clone may be partitionedbased on any subset of the set {X ,Y,Z}. In each Clone, items with the same valueon the subset of attributes are grouped together into an attribute group. This willensure items in the same attribute group are always positioned in the same partition.Figure 4.3 illustrates how items are put into attribute groups. A Clone using thesubset {X} will place all items with the same X value in the same attribute group,while a Clone using {X ,Z} will group items with the same X and Z together.Data stored in the Clones is kept consistent with MVS by using the same one-round transaction which updates MVS to also update the Clones. This will causewrite operations (put & remove) which used to touch one partition to now touchmultiple partitions (one partition in MVS and one in each Clone), forcing them tocomplete in two RTTs. Partitioning based on secondary attributes helps improvesearch performance since the Coordinator can quickly traverse the partition hierar-chy of a table and identify partitions that hold potential results. Their data is readusing a single read-only one-round transaction that completes in one RTT and re-turns a consistent snapshot of data upon completion. Adding Search Clones helpsimprove search performance by reducing write performance. Recent studies havefound high read/write ratio of large-scale key/value stores [50]. Thus such a trade-off seems reasonable, while tables needing high write performance can disableClones altogether.Most applications know the kind of search they will more likely execute, andthe application is provided the freedom to specify which Clones should be createdat table creation time, if any. It is also possible to add and remove Clones after thetable has been created during runtime, though the table will be unavailable whileadding a Clone. To add a Clone, write operations have to be paused until the newClone is successfully added. The servers hosting MVS are told to duplicate all thedata they have of the table into the new Clone. Removing a Clone is much fasterand write operations can execute normally while a Clone is being deleted.39It might seem that with D secondary attributes, a single Clone that partitionsbased on all D attributes is enough to answer any type of search. While this is truein theory, in practice it is not efficient, a problem known as the curse of dimen-sionality [54]. The number of partitions grows exponentially as more attributes areused for partitioning. This will lead to numerous partitions spread across almost allthe servers. Hence, search requests with ∗ on one or more attributes will be forcedto contact all the servers. In contrast, partitioning using fewer attributes reducesthe number of partitions and increases the chance that each search will contact asingle server which reduces network load. However, since partitions can only besplit down to attribute group granularity, caution needs to be taken when requestingClones. Imagine a secondary attribute being a single bit. Partitioning millions ofitems based on a single bit is very inefficient.4.4.2 Sparse ClonesWith large datasets, duplicating multiple copies of the data is not a reasonableapproach. Innesto allows Sparse Clones that only keep a copy of an item’s key,secondary attributes2 and version number. Since data in the Clones is updatedsynchronously with MVS, the version numbers of the items are consistent witheach other. This reduces the space required for storing arbitrary size items to asmall predictable overhead.Using Sparse Clones only affects search performance. A search on a SparseClone returns the key and version number of each item. The value of the results arethen retrieved from MVS using extra get operations on the specific versions. Thisadds an extra RTT to search. Table 4.2 shows the overall summary of operationexecution complexity and storage space required for a table that uses no Clones,full Clones or Sparse Clones.Usually MVS will have the requested version of an item. If for any reason aspecific older version of an item is no longer available, the results are invalidatedand search is retried. Note that each partition can separately tune its parameters2Only the attributes used for search. We expect the size of secondary attributes to be small andcomparable to the key size, and much smaller than the actual data. For applications where the datais represented as a combination of its attributes, an order preserving hash function can be used todecrease their size.40Table 4.2: Innesto’s latency and storage requirements for stor-ing N items with different configurations.Config Write+ Read Search StorageMVS 1 RTT 1 RTT n/a O(N)MVS + K Clones 2 RTT 1 RTT 1 RTT O(KN)MVS + K Sparse Clones 2 RTT 1 RTT 2 RTT O(N)+ Does not include stable log cost. For replication add an extra RTT,for disk logging add a synchronous write duration.of how many versions to maintain and for how long. Simple statistical analysistechniques can be used to tune these parameters if search operations start to fail.Also, a partition can switch to and from being Sparse at any time. This is useful ifa write-heavy partition is required to store long histories of data.4.5 Extended API4.5.1 Operation TransactionsComposability is an important feature of transactions which allows the function-ality of multiple transactions to be combined into one. Since all Innesto opera-tions use one-round transactions, any combination of operations can be atomicallybatched together in the form of larger transactions which will execute all at onceor not at all, e.g., put, remove and get a group of items altogether. Also, the op-erations need not be limited to a single table. While they will still be one-roundtransactions and therefore not as powerful as full-fledged database transactions,they provide enough functionality for many OLTP tasks.4.5.2 Conditional OperationsSince voting is part of the one-round transaction commit protocol, conditional op-erations are another extra feature added to basic key/value operations. A condi-tional operation is a normal key/value operation that only completes if a certainnumber of conditions hold true. For example, a conditional put is one that onlyinserts the item if the item’s value is equal to the specified condition. Consider an41Listing 4.1: Extended API example that acquires a lease using operationtransactions. The caller’s id is set using a put and a counter is incre-mented only if the lease is acquired using a conditional put that checksthe lease to be free beforehand.t x = new tx ( ) ;innes to op ( tx , t ab l e 1 , COND PUT, leaseId , 0 , 1 ) ;innes to op ( tx , t ab l e 2 , PUT, ownerId , appId ) ;innes to op ( tx , t ab l e 3 , INCREMENT, coun te r Id ) ;i f commit tx ( t x ) == t rue :/ / Lease s u c c e s s f u l l y acqu i red .application that needs to increment a shared counter. This is done by first readingthe counter value, incrementing it and putting the new value with the condition thatthe current value of the counter be what was originally read. Conditional opera-tions can also be used to acquire leases on certain data or change the data if a leaseis held. The need for conditional operation has forced other systems to add themto their API (e.g., SimpleDB [6] and Spinnaker [108]).Conditional operations are the only Innesto operations where the functionalityis different depending on the isolation mode. In the one-round transaction commitprotocol, the condition of conditional operations is checked at vote time, while themodifications are applied when the outcome of the transaction is decided, whichpotentially may be some time later. In lock mode, the conditions and target mod-ifications are locked and conditional operations can impose a condition using onekey and apply a modification on another key (e.g., if item 1 has a certain value thenput item 2). They can also be combined with any other operations without restric-tions. In clock mode, since data is not locked, there is a window of time betweenvoting and the outcome where the condition could be violated. Hence, in this modeconditional operations can only impose conditions and modifications on the exactsame key (eliminating the window of vulnerability), and cannot be combined withany other operation in transactions.4.5.3 ExtensionsThe final feature of Innesto is to allow application-specific extensions to be addedto basic operations. This will allow shipping computation with an operation. In the42Table 4.3: YCSB workload specification in terms of operation mixtures andaccess distribution. By default, each item has ten attributes with a totalsize of 1KB.Workload Characteristic Operations Dist. ApplicationA Update Heavy 50% Read/50% Update Zipf Session StoreB Read Mostly 95% Read/5% Update Zipf Photo TaggingC Read Only 100% Read Zipf User Profile CacheD Read Latest 95% Read/5% Insert Latest User Status UpdatesE Range Scan 95% Scan/5% Insert Zipf Threaded ConversationsF R/M/W 50% Read/50% R/M/W Zipf User Databaseshared counter example, a special increment operation can be created that atom-ically increments the value of a counter without the extra step of reading it first.To add a new extension, a special handler needs to be added to Innesto serverswhich implements the desired functionality given the partition, operation type andoptional data. Together with operation transactions, extensions bring more of thefunctionality of relational databases to the cloud. Listing 4.1 presents an exampleof combining the extended API features to acquire a lease.4.6 EvaluationWe evaluate the performance of our implementation of Innesto (roughly 32,500lines of C++ code) using the Yahoo! Cloud Serving Benchmark (YCSB) [70], anindustrial benchmark used to measure the performance of cloud storage systems.YCSB consists of 6 different workloads operating on a single table using differ-ent data access patterns which mimic real-world cloud applications (Table 4.3).YCSB also provides a low-level extensible storage interface which allows directcomparison of performance with different storage systems. We compare Innesto’sperformance to that of Cassandra [94], a leading industrial key/value datastore, andHyperDex [78], a new second-generation datastore.We use a set of 9 machines each equipped with two Quad-Core Intel Xeon(E5506) processors with 32GB of RAM and 12 TB storage running unmodified64-bit Linux version 3.2.0-48. On the machines we deployed Cassandra version1.0.12, HyperDex version 1.0.rc4 and Innesto. Eight machines formed a cluster43Figure 4.4: Average throughput for YCSB benchmark. The first three (red)workloads are write heavy, the middle three (gray) are read heavy, andthe right most workload (black) is search heavy.while one machine was used to run the YCSB benchmark and run the HyperDexcoordinator when required.4.6.1 Key/Value PerformanceWe first evaluate the average throughput of all three systems. In each run, we loadthe system with 10 million items (the ’Load’ phase). YCSB items by default have10 attributes with a total size of 1KB. Five of the workloads operate on keys, whileworkload E searches for items based on record number. All three systems wereconfigured to tolerate 1 server failure and use their default consistency configura-tions, which means Cassandra provides eventual consistency while HyperDex andInnesto provide strong consistency. HyperDex was set to be searchable and Innestowas configured to use one Search Clone partitioned based on record number anduse network replication. Unless stated otherwise, Innesto uses lock mode isolation.Figure 4.4 shows our results.Cassandra represents the range of performance cloud-scale applications requirefrom the underlying storage system. Any new system, regardless of the new fea-tures it has to offer, should at least provide competitive performance with Cassan-44dra. HyperDex represents the only other key/value datastore that can provide thesame search capabilities as Innesto. Thus, we wish to see how Innesto performscompared to each system, where one sets performance requirements and the othersets expected features.In Figure 4.4 we can see for both write heavy workloads (the first three bars)and read heavy workloads (middle three bars) Cassandra provides an averagethroughput of 30K-45K op/s due to its weak consistency model. A write requestis written to disk on the receiving server (not necessarily the target server) and theclient is notified immediately, while the operation is slowly routed to its destina-tion server in the background. Additionally, read requests don’t need to wait for thelatest version of an item to arrive, yielding high overall throughput for Cassandra.However, Cassandra performs poorly for scan operations (right-most bar) which is20x slower than other workloads. On the other end, HyperDex’s search throughputoutperforms Cassandra’s scan by a factor of 7x, due to its data arrangement, butperforms poorly for write-heavy workloads since write operations are chained se-rially in its value-dependant chain. Thus, HyperDex only performs well for read-heavy workloads. Innesto’s operations are designed to compete in the minimumpossible number of network RTTs and update Clones in parallel. Thus Innesto isable to outperform HyperDex’s write and search performance, while having com-petitive performance in the other workloads compared to Cassandra. Innesto hasan order of magnitude (10x) higher throughput for search compared to Cassandra.4.6.2 Load BalancingLoad balancing is an essential requirement for large-scale cloud applications whichis why most key/value stores (e.g., Cassandra) use hash based routing for statisticalload balancing. Innesto performs its own load balancing in order to preserve itspartition hierarchy which is used for search. We wish to see how Innesto performsas the data stored in a table grows. In the Load phase, YCSB inserts items into thesame table. We measure average overall throughput every time half a million newitems have been stored. Figure 4.5 shows our results. As expected, Cassandra isable to maintain a steady throughput achieved through statistical load balancing.HyperDex uses a central coordinator to partition a table when it is created. Thus45 0 10 20 30 40 50 60 0  2  4  6  8  10Throughput (1000 op/sec)Data Size (million items)CassandraHyperdexInnestoFigure 4.5: Temporal throughput in Load phase of the YCSB benchmark.Notice how HyperDex’s performance decreases with dataset size whileCassandra and Innesto maintain a steady a partition hosts more data, performance decreases. Innesto consistently splitscongested partitions and distributes them evenly between all the servers which iswhy it can maintain its high throughput as the data is growing. Innesto’s loadbalancer amortizes the cost of maintaining a partition hierarchy.4.6.3 Clone CostTo illustrate the costs of adding search capabilities to Innesto, we configure Innestoto run YCSB without any search capabilities (MVS only), and compare its perfor-mance when it supports search with a single Clone. We also measure performancewhen configured to use a Sparse Clone.Figure 4.6 shows that adding Search Clones decreases throughput almost inhalf for Load. Without Clones, all Innesto transactions are designed to completein one network RTT, providing an extremely high key/value throughput. Withthe addition of Clones, transactions fall back to two RTTs required by two-phasecommit. Using Sparse Clones has more or less the same performance as using fullClones for all workloads except Workload E which uses search. The extra RTTrequired for fetching search results from MVS contributes to a 30% decrease in46 0 10 20 30 40 50Load A F B C D EThroughput (1000 op/sec)MVS Full Clone Sparse CloneFigure 4.6: Average Innesto throughput for YCSB benchmark with throughput.4.6.4 High DimensionalityYCSB item attributes are not searchable for normal key/value datastores. To mea-sure the cost of high data dimensionality, i.e., data that is searchable on multipledifferent attributes, we vary the number of attributes stored per item (out of eachitem’s 10 attributes). A separate Search Clone is created per stored attribute toserve alongside the Clone based on record number. We investigate how Innestoand HyperDex perform with highly dimensional data. Innesto was configured touse Sparse Clones and we measure the throughput in the Load phase when inserting5 million items.Figure 4.7 shows that throughput decreases as the number of clones increasesfor both systems. With 8 Clones Innesto performs better than HyperDex with 3since HyperDex chains operations serially while Innesto updates the Clones andMVS in parallel. With 11 clones Innesto’s throughput drops to 24% of not havingany Clones. This is due to the overhead of adding extra items to each transactionmultiple times (approximately 150 bytes per extra Sparse Clone). However, such asystem can efficiently answer 11 different types of search requests.47 0 5 10 15 20 1  2  3  4  5  6  7  8  9  10  11Throughput (1000 op/sec)Number of ClonesHyperdexInnestoFigure 4.7: Average throughput for write-only workload of inserting 5 mil-lion items with varying number of data dimensionality.4.6.5 Lock vs. ClockIn this section we evaluate the performance cost of locking data items comparedto the clock-vector isolation method which provides the same strong consistencyguarantees. We configure Innesto to use both locking as done in all experimentsso far, denoted as Innesto-Lock, and clock vectors, denoted as Innesto-Clock. Werepeat the experiments of Section 4.6.1, running all YCSB workloads in each modeas seen in Figure 4.8.Compared to locking, using clock vectors reduces throughput by approxi-mately 31.8%. In clock mode, each server maintains a virtual time counter andrefrains from processing a transaction (and all others ordered after it) until its times-tamp has been finalized. Thus, even though a transaction may be ready to execute,it might be delayed by another potentially non-conflicting transaction which is stillpending on a final timestamp. This creates false sharing between unrelated transac-tions on the same server. In contrast, locks are held on item granularity to avoid anykind of false sharing. Also, the clock mode commit protocol forces multi-serverread-only transactions to complete in 2 RTTs, compared to their 1 RTT executiontime in lock mode. This reduces search performance.Lock based systems are known to start thrashing as lock contention increases.48 0 10 20 30 40 50Innesto-Lock Innesto-ClockThroughput (1000 op/sec)LoadWorkload AWorkload FWorkload BWorkload CWorkload DWorkload EFigure 4.8: Average throughput for YCSB benchmark for Innesto in lockmode and clock mode. Compared to lock mode, clock mode has onaverage 31.8% lower throughput.To trigger lock conflicts, we modified Workload A’s access distribution to behotspot. In the hotspot distribution, given a number of items, P% of all opera-tions only reference a fixed subset of H hot items. The client uses a thread pool toissue concurrent requests. The ratio of H to the size of the thread pool representsthe probability of different threads accessing the same item. With 10 million items,we set H to be one fifth of the thread pool size and varied the probability P ofaccessing a hot item. Thus with a high enough P, threads are guaranteed to conflictwith each other as seen in Figure 4.9.In lock mode, lock contention increases with P, degrading performance. Un-der contention, three major sources of delay contribute to low performance. First,transaction messages could be recieved by the Participants in different order. Thiscauses partial lock conflicts which will result in the locks being released and thetransactions being retried, adding extra RTTs. Second, under lock conflicts trans-actions wait an exponentially weighted random time before retrying to avoid dead-locks. Third, all locks should be acquired on all servers before a transaction canproceed. Thus, the transaction has to wait for the slowest server to keep up. Notethat each server could process other transactions while waiting, but the processing49 0 5 10 15 20 25 30 0  25  50  75  100Throughput (1000 op/sec)Probability (%)Innesto-ClockInnesto-LockFigure 4.9: Average throughput of Innesto in lock mode and clock mode us-ing the hotspot distribution. The x-axis shows the probability of access-ing a hot item.time of each conflicting transaction increases. On the other hand, in clock modetransactions are globally ordered regardless of what items they touch. No matterwhat the access distribution, overall throughput is more or less the same.Finally, to investigate the effects of the size of hot items, we varied the ratio ofhot items H to the thread pool size with a hotspot access probability of 95%. InFigure 4.10 we see that with a low ratio, lock-based throughput drops to around1.5K op/s while clock-based throughput remains almost unchanged. As soon asthe ratio increases over 1 which represents a very low probability of conflict, lock-based throughput is restored close to its maximum value.Our experiments in this section illustrate the two ends of the performance spec-trum offered by Innesto. Lock mode generally has better performance and offersmore features but overall throughput depends on the application’s data access pat-terns. Compared to lock mode, clock mode doesn’t support generic conditionaloperations, but offers performance independent of the type of workload. The im-portant thing is Innesto servers can operate in both modes on the same data anddynamically switch modes at run time.50 0 5 10 15 20 25 30 35 40 0.01  0.1  1  10  100Throughput (1000 op/sec)Hot Items / ThreadInnesto-ClockInnesto-LockFigure 4.10: Average throughput of Innesto in lock mode and clock modeusing the hotspot distribution with a fixed access probability of 95%with varying number of hot items.4.6.6 SummaryOur results using micro benchmarks show that for basic key/value operationsInnesto performs as well as Cassandra, while having superior consistency guar-antees and an order of magnitude better performance for search while providing amuch richer API. Innesto scales better than HyperDex with dataset size and per-forms better for write heavy workloads and for data with high dimensionality inevery aspect.4.7 Related Work4.7.1 Cloud Storage & Key/Value DatastoresTraditional key/value datastores rely on two major classes of architectures. Firstare the Chord [112] ring-based architectures. Amazon’s Dynamo [76] which servesas the basis of many cloud storage systems like Cassandra [94] is an example. Itprovides a schema-less data model with the main focus on availability by providingeventual consistency via a limited interface to the data. The second class are basedon partitioning. BigTable [65] (and its open source implementations HBase [8] and51HyperTable [22]) partition a table into tablets. A tablet is a group of rows in a ta-ble with predefined bounds indexed globally where consistency is only guaranteedwithin the same row. All systems primarily operate on item keys via the basic key/-value interface. Consistency is relaxed or limited to single items. While Innestoalso uses partitioning, cross-partition data consistency is guaranteed via a muchricher interface. Spinnaker [108] uses Paxos over a tablet-based storage system forhigh performance consistency. However, the focus is on replication not supportingsearch.Cassandra [94] supports secondary indexing for multi-attribute range searchand uses brute force to filter out results of a single-attribute scan. Our evaluationshows that Cassandra’s scan is an order of magnitude slower than Innesto’s search.Newer cloud systems like PNuts [69] have moved toward using tablets to providesemi-relational interfaces. PNuts also supports single attribute range scans in theform of iterative traversal of items. However, the result of the scan does not providethe same level of consistency that Innesto search does. Megastore [51] movesbeyond that to bring more relational features to NoSQL. Data is partitioned intoentity groups where consistency is guaranteed within each partition but has relaxedconsistency across them. ACID transactions are supported within an entity groupand are heavily used in Google [20], evident that modern cloud applications needtransactions no matter how limited their functionality. Innesto supports cross-tablecross-partition transactions with high performance, unlike Megastore that has lowwrite performance across partitions.HyperDex [78] is a recent feature-rich system that marks a second genera-tion of key/value datastores. It uses value dependant chaining to serially executetransactions. This forces write operations to complete in multiple network RTTs.Innesto’s operations are designed to complete in the lowest amount of time pos-sible. HyperDex’s central coordinator partitions each table statically when a tableis created. Modern cloud applications have varying load over time which leads tohigh load fluctuation in HyperDex. Innesto servers continuously monitor their loadand shift partitions accordingly to adapt as access patterns change. Warp [44] isHyperDex’s latest addition that allows it to support ACID transactions. However,still being based on chaining, transactions are executed serially with declining per-formance as the number of operations within a transaction increases.524.7.2 Transactional DatastoresAmazon’s SimpleDB [6] added the support for conditional operations to providesome sort of simple transactions, another indication that cloud applications needa form of transactions. The B-tree [48] based on Sinfonia [49] uses one-roundtransactions to implement a B+-tree which supports single-attribute range search.Due to false sharing, the B-tree has very bad write performance and can only indexsmall (couple of bytes) values. Minuet [111] enhances the B-tree’s write perfor-mance and allows creating versioned snapshots for what-if analysis. Still, Minuetcan only index small values unlike Innesto that performs well when storing arbi-trary sized values for write-heavy workloads. Augustus [105] partitions data anduses two-phase commit between partitions to provide Byzantine fault tolerance(BFT), but the partitions are not meant for search. The commit protocol is verysimilar to Innesto’s one-round transactions with the addition of BFT. Spanner [71]is a globally distributed database that uses Paxos for consistency, designed for geo-replication not high dimensional data. Like Innesto, all systems can support ACIDtransactions but neither support multi-attribute search.4.7.3 Peer-to-Peer (P2P)A large body of work exists for supporting multi-attribute search in P2P usingnetwork overlays. Arpeggio [68] uses a Chord ring and artificially inserts extraitems for secondary indexing which hold lists of keys having the secondary index.Thus it can efficiently support get on exact secondary attributes but not on ranges.CAN [109] uses spatial partitioning to split a multi-dimensional key space and as-sign each one to a single server. CAN maintains no hierarchy and messages arerouted toward the destination in multiple hops. MAAN [62] uses a Chord ringwith an order preserving hash function. Range searches are mapped to consec-utive ranges on the ring where the result is the intersection of multiple differentsearches. Mercury [57] creates multiple order-preserving Chord rings and a multi-attribute search is the intersection of multiple single attribute searches on separaterings. SkipIndex [126] indexes an N-dimensional key space using skip graphs (i.e.,N-dimensional skip lists). It uses a partition hierarchy to execute range search.MURK [82] uses KD trees to partition the key space based on load. It uses space-53filling curves to map a multi-dimensional space to a single dimensional one whenconstructing skip pointers between partitions.While these systems support search, their main focus has been routing in P2Pnetworks. Operations generally execute in multiple steps with no consistency guar-antees. Innesto explicitly uses transactions to ensure a correct global serializationof operations. Also, they partition based on all secondary attributes which doesnot scale with high dimensional data. Innesto supports efficient range searches byallowing an application to select what types of partitioning should be preformedon a table. Innesto also allows a table to be optimized for different types of rangesearches.4.7.4 Distributed TransactionsEfficient lock-based one-round transactions were introduced in Sinfonia [49]. Sin-fonia exposes an unstructured linear address space. Multiple different applica-tions [48, 111] built over Sinfonia illustrate the flexibility of one-round transac-tions. However, Sinfonia memnodes have very limited functionality which forcesbad design decisions [48, 72]. Innesto’s transaction layer was inspired by Sinfoniabut replaces the linear address space with flexible data types, allowing Innesto todirectly intervene in the commit protocol. For example, Innesto’s split takes advan-tage of this to efficiently execute in a single RTT which would’ve taken multipleRTTs if directly implemented over Sinfonia.Our implementation of Innesto’s clock mode is based on Granola [72] whichuses clock vectors to isolate one-round transactions in a lock-free manner. Ourevaluation shows that Innesto’s locking protocol starts to fail under high contention,while Granola does not suffer from such problems. However, since Granola trans-actions are executed some time after voting has finalized without any locking,Innesto cannot support generic conditional operations as part of its extended APIin clock mode. Also, in the voting phase of Granola, servers directly communicatewith each other and exchange O(P2) messages for a transaction with P partici-pants. With a high number of Search Clones this could become very expensivefor Innesto. Additionally, since Granola uses a single counter per server, it causesfalse sharing between operations touching different partitions. We consider mod-54ifying Granola’s voting mechanism to exchange O(P) messages and maintainingitem counters per partition to perform synchronization at a finer granularity andreduce false sharing between transactions as future avenues in pursuing this work.4.8 ConclusionWe present Innesto, a second generation searchable key/value datastore that pro-vides the ability to efficiently search stored items based on their secondary at-tributes. It provides strong consistency with a transactional interface and can tol-erate group failures of underlying machines. Innesto provides a scalable cloud-based solution to applications that cannot tolerate traditional limitations of NoSQL.Innesto uses an efficient implementation of two-phase commit for high perfor-mance one-round transactions. Key/value operations are carefully designed to exe-cute in the least amount of time possible. Innesto’s load balancer allows maintain-ing a partition hierarchy used to quickly locate data. Our evaluation of Innesto us-ing an industrial benchmark show its performance competes with Cassandra whileoffering a new world of features.55Chapter 5SPEX5.1 IntroductionMillions of dollars are spent each year on infrastructures that host large-scale vir-tual environments (VEs), such as World of Warcraft [46] and Second Life [33]. Thetwo common approaches taken to host multiple concurrent users in such VEs aresharding and full partitioning. Sharding entails running multiple parallel instancesof the VE, each assigned to a different host machine. Users are then assigned to aspecific shard and cannot migrate or interact across their boundaries [99]. Full par-titioning, on the other hand, involves only a single instance of the VE partitionedinto smaller regions, each handled independently. Users are then assigned to spe-cific hosts based on their most recent locations and are transparently handed off toother hosts as they pass region boundaries [34].In a sharded VE, user interactions are limited to those within the same shard.Though the VE may have a high concurrent user count (e.g., World of Warcraft hasover hundreds of thousands of active users on record [124]), each user can onlyobserve and interact with a small fraction of other users in the world. With fullpartitioning, the interaction range of a user is limited to a user’s current region, orat most the neighbour regions when nearing borders. Long range interactions (e.g.,looking through a telescope to a distant location) are fundamentally unsupported.Most systems also do not utilize resources effectively due to their static partition-ing. Since all regions should be available, all servers need to be online even though56the user distribution across regions may vary widely by date or time. This resultsin over-provisioning or under-utilization of the resources [96]. While changing thepartition mapping offline alleviates the issue somewhat, it is a manual task and dis-rupts service up-time. Static partitioning also poses a limit on user density, as themaximum number of users in a region is capped to what a single core can simulate(with tasks such as game logic calculations, visibility management, or physics).Thus, densely crowded locations, such as stadiums or exhibits, are inherently diffi-cult to realize (i.e., there is a density limit by design). Finally, First-Person Shooters(FPS) require prompt message delivery (e.g., 150ms [67]) and long-range interac-tions (e.g., sniper rifles). Due to the high volume of state updates per second withcomplicated interrelationships, large-scale VEs today cannot support FPS gameswithout either restricting user interactions or using a centralized component.We are motivated to remove some of these restrictions with SPEX, an infras-tructure that supports scalable spatial publish/subscribe (SPS) [86]. SPEX is de-signed for VEs with hundreds of concurrent users. Users are allowed to roam freelyin the VE without any density limitations while being able to interact with any useror region at arbitrary distances. The main contributions of SPEX are the follow-ing. First, we design an architecture that de-couples game logic processing frominterest management [101], so that message routing within the VE server clusteris done very efficiently with dedicated resources. Using SPS as the primitives forinterest management allows interaction ranges to be highly flexible. The SPS ser-vice is also consistent and fault tolerant in its stored states, by using a distributedtransaction provider [118]. This enables us to combine publish / subscribe (pub/-sub) with spatial queries, two features traditionally treated separately (i.e., pub/subis for transient messages, while queries are done on persistent states). Second,we combine dynamic spatial partitioning with scalable pub/sub (also known asapplication-layer multicast) [64] for adaptive load balancing. This provides a two-level progressive mechanism to assign loads to different hosts, and to deliver moremessages should the density level require. This eliminates the density caps of usersin the VE, while balancing system operation costs. Third, by adopting a fully dis-tributed architecture, we practically scale our implementation of SPEX to tens ofhosting servers, and support 750 users with end-to-end latencies within 100 mil-liseconds over continental distances on the Internet.57In this chapter, we first present a background on interest management and SPSin Section 5.2. An overview of techniques used in SPEX is described in Section5.3, and Section 5.4 describes how SPS is performed. Section 5.5 presents im-plementation details where use cases of SPEX can be found in Section 5.6. Weevaluate our implementation in Section 5.7, compare to related work in Section 5.8and conclude in Section Virtual EnvironmentsA VE simulates an environment in which a group of actors move and interact withit based on a set of clearly defined rules, such as physical laws. Each actor encodesits behaviour in a series of events, and runs a local version of the simulation basedon its observation of the last known states of the VE. Changes to the states ofthe VE are sent to the actors periodically via update messages. When scaling thenumber of actors in the VE, communicating the latest states of the VE to the actorsbecomes a challenge since the number of messages exchanged grows quadraticallywith the number of actors. Interest management [101] is a well-known technique toaddress this problem based on the observation that while the VE may be arbitrarilylarge, an actor has a bounded interest range, i.e., its Area of Interest (AoI) [104].An actor’s AoI may have any possible shape and includes all the locations in theVE of interest to the actor. Actors ensure that the VE has knowledge of their latestAoI, while the VE ensures that the actors are up-to-date on the state of the VEinside their AoI.While the concept of AoI is widely known in VE research, there are caseswhere it may not fully capture the requirements of a VE. For example, while anactor’s visibility or field of view can be supported by having an AoI in the VE,when a task requires making an effect on an area (e.g., casting a fire spell burningeverything within a radius), the concept of Area of Effect (AoE) [114] may bemore relevant. Combining both the concept of AoI (i.e., a subscription of events)and that of AoE (i.e., a publication of events) yields the more general concept ofSPS.SPS [86] is defined as the fundamental set of functions required to fully han-dle interest management in a VE. SPS mandates that an actor have the freedom58Figure 5.1: Basic architecture of SPEX. All communications are done usingone-round specify a subscription space and a publication space of arbitrary shape. Anymessage sent to a publication space should be delivered to all overlapping sub-scription spaces. From a high level perspective, most existing VE functions can beobserved as special cases of SPS functions. For example, common VE operationssuch as chat, trade, attack, etc., can all be built by knowing neighbours spatially,or affecting other entities within a spatial radius. The significance of SPS thus isthat an architecture supporting SPS can be the basis for any kind of VE. Hence,if scalable SPS becomes possible, so will a VE with a large number of concurrentusers. In this context, SPEX is a novel approach to large-scale VEs by supportingSPS functions in a scalable and consistent manner.5.3 OverviewFigure 5.1 shows the basic architecture of SPEX. We employed a cloud architectureconsisting of SPEX hosts, a group of machines cooperating with each other toprovide a unified and fully distributed service. With a cloud architecture, we arenot concerned with bandwidth usage between the hosts, and CPU resources can be59added on the fly. Additionally, network round-trip times (RTTs) between them arein the order of milliseconds, allowing state stored on other hosts to be retrieved ina short amount of time when required.SPEX consists of three main components: proxy, partitioning and multicast.Each component is fully distributed, i.e., multiple SPEX hosts form each compo-nent. Also, a single SPEX host can be part of multiple components, e.g., be a proxyhost, partitioning host and multicast host at the same time. No host ever acts as anykind of coordinator, nor is one required to maintain any form of global state.The proxy is the frontend of the system which directly communicates withusers. A proxy host serves a subset of the users and acts on their behalf in theVE. Each user sends its commands (in the form of events), such as movements andinteractions in the VE, to its assigned proxy. The proxy handles all necessary func-tions communicating with other components and sends the required information tothe user in the form of regular updates. A user has no strict affinity to a specificproxy host and can be freely handed-off from one to another. This is essential toevenly distribute users between hosts.SPEX is designed to practically scale to tens of hosts to support hundreds oreven thousands of users in a single VE. SPEX’s primary goal is to overcome thelimitations of sharding and full partitioning. To this end, we make no assumptionsabout user behaviours and impose no restrictions on how densely they can cluster.Note that this does not mean SPEX will perform under any circumstances, but willhave a graceful performance degradation under heavy load rather than a completebreak down. SPEX supports scalable SPS by providing a more generic API com-pared to traditional VE architectures. The API does not limit how users can interactwith other users or parts of the VE, no matter how far apart they are.SPS functionality is provided by using partitioning to quantize publication andsubscription areas with arbitrary shapes into meaningful sub-regions of the VE.Pub/sub messages are forwarded to the hosts responsible for managing the targetsub-regions. To correctly deal with potential consistency issues that could occurdue to quantization and message re-ordering on different hosts, SPS functions areeach executed using a separate one-round transaction which ensure correct globalordering of events across hosts. Due to unrestricted user behaviours and interac-tions, potential hotspots (caused by flocking) and over-congested regions are in-60evitable. SPEX avoids hotspots as much as possible by increasing the quantizationgranularity. However, when hotspots are impossible to avoid, their effects are min-imized by utilizing additional resources as much as possible to multicast relevantupdates. In the remainder of this section we elaborate on the details of how eachtask is done.5.3.1 Dynamic VE DistributionSpatial partitioning is a widely employed technique where the VE is split into agrid of fixed-sized cells, triangular strips or hexagons [33, 85, 91]. Each parti-tion represents a VE region handled by a specific host identified by a partitionmapping. Partitioning reduces the workload by having actors only interact withpartitions that overlap their AoI, but suffers from two major limitations. First, se-lecting an optimal size for the partitions is challenging: a large size will result ina mismatch when mapping AoIs to regions, causing extra overhead when forward-ing irrelevant events to users; while a small size will result in an excessive numberof regions, inducing more load per actor operation. Second, when using a staticpartition mapping (e.g., computed offline using a hash function), there is a highworkload variation across highly popular partitions (hotspots) and under-utilizedones.SPEX avoids these limitations by using progressive partitioning combined witha dynamic partition hierarchy. With progressive partitioning regions are parti-tioned based on the density of events within them. If the activity in a regionexceeds a threshold, partitioning is refined by splitting the original partition intosmaller ones. Conversely, inactive partitions are merged to reduce the total numberof regions. This causes the number of regions to be proportional to the distributionand frequency of events, regardless of the VE size. Figure 5.2 presents an illus-tration of a partitioned VE using both full partitioning and progressive partitioningbased on a quad-tree [80]. Each dark spot represents a source of events. With fullpartitioning, the number of partitions is inversely proportional to the region sizeregardless of the event distribution. For a large VE this could result in millions ofpartitions. On the other hand, with the same event distribution, progressive par-titioning distributes the events more evenly between fewer partitions, and avoids61Figure 5.2: (left) Full partitioning vs. (right) Progressive partitioning.hotspots as much as possible.The key enabler for SPEX’s progressive partitioning is its dynamic partitionhierarchy, which distributes partitions between the hosts. Each partition maintainsa pointer to its parent and child partitions in the form host id : partition id, wherehost id uniquely identifies the IP and port of the host and partition id identifiesthe partition. Figure 5.3 depicts how a VE progressively partitioned into differentregions form a hierarchy, and how the hierarchy is distributed between four SPEXhosts.To publish an event in the VE, a host finds the most suitable partitions for thepublication and forwards the event to them. The root of the hierarchy enclosesthe entire VE and is stored in a specific host known to all others (this does notconstitute a single point of failure due to replication as described later). The restof the partitions can then be located by traversing tree links starting from the rootin logarithmic steps. In practice, upper layers of the hierarchy tend to changeinfrequently. Thus each host optimistically caches these parts of the hierarchy,providing replication. This improves performance by reducing the communicationsneeded among hosts when locating a partition. However, the structure of the lowerparts is constantly changing. These changes are lazily propagated to hosts to avoidexcessive replication overheads. If a host tries to publish an event using a staleversion of the hierarchy, the publication is rejected and the host is told which partsof the hierarchy have changed. The host will then have to update its cache beforeretrying.The distributed hierarchy provides a scalable way to locate partitions with low62Figure 5.3: Distributed dynamic partition hierarchy.latency while not requiring any component to have a global view of the system.It is also used to answer spatial queries, e.g., finding entities along the path of abullet.5.3.2 Consistency & Fault ToleranceAn important issue that threatens the practicality of a distributed VE is consistency.While some inconsistencies may go unnoticed by users or only somewhat degradeits enjoyability, others endanger correctness and operability of the system. Twomajor sources of inconsistency exist in a distributed VE. First are inconsistenciesin the VE states caused by conflicting interactions between actors. For example, iftwo or more actors try to perform the same operation, like picking up the same itemor shooting at the same actor, there needs to be a strict and deterministic mechanismto reject all but one operation. Second are inconsistencies in the infrastructurecaused by conflicting operations by the hosts due to incorrect assumptions. Asan example, consider the scenario where one host is using a stale version of thepartition mapping (i.e., a wrong assumption) to publish events to partitions thathave previously been split into smaller ones or deleted/merged by other hosts. Suchan operation should cleanly and completely be dealt with without any side effects.63Even when the assumptions are correct, a more subtle form of inconsistencyarises when a single event triggers a host to send messages to multiple other hosts.Since there is no guarantee on what order each host will process the different mes-sages it receives, inconsistencies could result due to this lack of consistent eventordering. Consider when host S1 needs to send messages MS1A, MS1B to hosts Aand B, respectively. At the same time, host S2 also needs to send messages MS2A,MS2B to A and B. With simple message passing, there is no guarantee that A and Bwill both either see S1’s message before S2’s message, or vice versa. They mightsee them in different order, which means that they will process them in differentorder. Inconsistencies would show if the messages are in conflict, e.g., trying topick up the same item as explained earlier, as each server would award the item todifferent players. This situation aggravates as the number of recipients increases.We note that distributed VE systems targeting on scalability often do not ex-plicitly deal with consistency. In the case of partitioning and sharding, interactionsare limited to the current partition or shard, so consistency is implicitly ensuredby design through the sole owner of the partition or shard. Other systems that tryto provide a single VE instance, e.g., EveOnline [17] or Pikkotekk [27], ignoreevent ordering issues, but events also do not trigger message delivery to multipleservers (i.e., AoE is limited). Hence they avoid the second form of inconsistency.SPEX is designed to surpass the limitations of similar systems. Interactions arenot confined to a single shard or partition, nor are AoEs restricted (via the sup-port of generic area publications). Therefore, we carefully built in a consistencylayer in the architecture of SPEX which explicitly deals with all forms of potentialinconsistency.SPEX addresses consistency by using a distributed transaction provider, alight-weight wrapper around the transportation layer that acts as a mediator be-tween the hosts. It prevents inconsistencies from propagating into upper layers. Astandard API for a transport layer is to provide a means to “send message M fromsender S to receiver R”. At some point afterwards, the message is delivered to amessage handler on receiver R. The transaction provider provides a similar yetmore powerful interface. First, it allows the sender to also state any assumptionsthey might have about the message along with it. The message is only delivered ifthe assumptions hold at delivery time. Second, it allows a compound message to64have multiple recipients. That is, a message can consist of smaller messages, eachof which is delivered to a different receiver. Finally, it ensures atomic and globallyordered delivery of the message. All recipients will receive the messages in theexact same order, or not at all. In the case of delivery, the message is deliveredto the same message handlers of the receivers. The transport API offered by thetransaction provider is hence transformed into “send message M from sender S toreceivers R1,...,Rn only if conditions C1,...,Ck hold”.Using the distributed transaction provider, the actor conflict example is re-solved by having an actor to send the message “pickup item only if it’s still avail-able”. The host conflict is also resolved by each host publishing events like “pub-lish event to partition only if it hasn’t varied”. Rejected messages are returned tothe sender which should take appropriate actions. An actor failing to pick up anitem knows that it should not add the item to its inventory, and the host realizesthat it should update that part of its cache. However, the benefits offered by thetransaction provider are not free. They come at the cost of potential extra deliv-ery time caused by the complexity of the underlying protocol and synchronizationoverhead. Nonetheless, the order of extra delay is that of a network RTT, at most afew milliseconds in SPEX’s architecture.The distributed transaction provider may also provide protection against fail-ures. Since all important communications are transmitted through it, each host maybe seamlessly backed up with a number of mirror hosts. The transaction providerwill ensure that backups are in perfect sync with the primary. In the event of thefailure of a primary, a backup will take over and operations will resume. This ob-viates the need for redundant failure protection mechanisms in other components.5.3.3 Adaptive Load BalancingThe load on a VE infrastructure drastically varies over time. The total numberof users changes with time, and users tend to cluster in different locations based tocurrent events [96,99]. Having the ability to shift load between hosts is essential forboth scalability and cost efficiency. Since the processing power of any given host islimited, when a host’s load reaches a limit, it should be redistributed. Conversely,when the overall capacity of the hosts exceeds the total system load, loads should65be shifted away from under utilized hosts so they can be safely terminated to reducecosts.We classify load into two different components. First is distributing a largenumber of users between hosts. With thousands of users, tens of thousands ofupdate messages arrive at SPEX every second, and it might stream millions ofmessages back. We handle this by distributing the users between proxy hosts,and use progressive partitioning to reduce the quadratic communication complexitywith the number of users by forwarding events to a bounded number of relevantpartitions. When a partition’s load exceeds a threshold, the partition is split intosmaller ones. The load balancing component ensures that the new partitions aredistributed between different hosts to spread the load as evenly as possible. Thedynamic partition hierarchy allows the partitions to have no affinity to a specifichost, so that partitions may fluidly migrate between machines. This is especiallyuseful when the total load of multiple partitions located on the same host exceedsits capacity, yet neither is individually high enough to be split and redistributed.The second load component occurs when no amount of partitioning can reducethe work load to match a host’s capacity. A good example is the ice rink in a hockeyarena which is a single over-important hotspot that most of the actors publish orsubscribe to it. Another example is the flag carrier in a capture the flag match[55]. This form of load is why partitioned VEs limit the maximum actor densityin the VE. SPEX handles this by reinforcing the partition hierarchy with scalablepub/sub.Scalable pub/sub (e.g., Scribe [64]) has been used for hosting distributed VEsin a myriad of different architectures [91, 104], where a topic (or channel) rep-resents a source of events. Scalability is achieved by allowing multiple hosts tojointly disseminate a topic’s messages by forming a reverse forwarding tree foreach topic. This allows more hosts to help forward events to a subset of the re-cipients, as opposed to one host forwarding to them all. However, it comes at thecost of extra network hops per publication (another reason we employed a cloudarchitecture to minimize these costs). However, scalable pub/sub is not suitablefor hosting a VE on its own. It provides a discrete range of topics where contin-uous AoIs and AoEs somehow need to be mapped to them, which is a non-trivialtask. Additionally, a VE based on pub/sub alone has difficulties answering spatial66queries, and inconsistencies may result from conflicting publications with nothingthere to prevent them.SPEX already handles consistency issues and has a partitioning componentthat maps areas to partitions. Thus scalable pub/sub fits in nicely with these com-ponents. Each partition is associated with its own topic which is created and de-stroyed along with it. Any events that occur in the partition are published to anddisseminated by the topic. As we will see in our evaluation, adaptive load balanc-ing plays an important role in hosting regions with densely clustered actors.5.4 SPS FunctionalityProxy hosts receive events from users and act on their behalf in the VE. They dealwith player churn and maintain enough player information to be able to translateapplication events into SPS functions as described next.5.4.1 Spatial PublicationsPublishing an event requires finding a list of partitions that intersect the area of thepublication. For point publications the target will be a single partition which is thelowest one in the hierarchy (a leaf in the tree). For area publications the host mustpublish to the set of all leaf partitions that intersect the area. Publishing to non-leafpartitions would create parallel paths for publications, i.e., publishing to either ofthe ancestor partitions of a leaf. The transaction provider cannot detect the depen-dency between publications on different paths and would lead to inconsistencies.Thus we do not allow publications to non-leaf partitions.Partitions are found through their parents using the partition hierarchy. Re-sults are cached for future publications to increase locality. Once found, a one-round transaction is used to write the publication to the partitions. The transactionprovider handles synchronizing messages on different hosts. When the transactionis executed, the event is written to the topics associated with the partitions involvedin the publication, which disseminate it to other hosts interested in the event. Thesehosts in turn forward it to appropriate users.675.4.2 Spatial SubscriptionsProxy hosts are responsible for interest management of their users. They handleuser subscriptions by maintaining a per-user subscription list, a list of all topicssubscribed to by the respective user. The host then manages its own subscriptionlist as the union of all the subscription lists of its users. It subscribes to each topicinside its subscription list. Once a publication is received for a topic, all subscribedlocal users are then forwarded the event.The unit of subscription is a partition. Subscribing to a partition’s topic is doneusing the partition hierarchy. A subscription list is populated by intersecting thesubscription areas with partitions. This is done recursively. The subscription pro-cess starts at the root partition of the VE which covers the entire VE space. Foreach non-leaf partition, subscription is then refined by also subscribing to intersect-ing child partitions, while maintaining the subscription to current partitions. Thisallows hosts to have no global view of the VE but to quickly discover unknownparts as needed. As actors move in the VE their AoIs change. Modifying a sub-scription is done by computing the difference between the subscription areas of theold AoI and the new one then subscribing/unsubscribing to the topics accordingly.Modifications to the partition hierarchy as a result of splitting or merging par-titions are propagated to the subscribers in the form of special events. When apartition is split, the event will cause subscribers to refine their subscriptions byadditionally subscribing to newly created partitions. For merge, new publicationswill arrive on the parent partition’s topic. Recall we do not allow publications onnon-leaf partitions. This is a signal that child partitions have been destroyed and thepartition is now a leaf. Upon receipt, subscribers to the destroyed child partitionswill unsubscribe from them.5.5 Implementation Details5.5.1 Spatial PartitioningOur spatial partitioning component uses Innesto [117]. Innesto is a distributedkey/value store that uses spatial partitioning. We use an event’s spatial coordinatesas its key and the actual event as the value. Innesto supports keys with any number68of dimensions so it can support a VE with any number of dimensions (e.g., 2D, 3D,etc.). We added a publish function to Innesto. Publish uses an event’s key to com-pute the partitions that should receive the event and delivers it to each partition’spublication handler.A publication handler is hooked to the root of a topic in the scalable pub/subcomponent which handles pub/sub delivery. The publication handler monitors therate of events published to its partition and decides when to split the partition. Italso handles publishing special events upon structure modifications. A partition isversioned with a modification counter that is incremented each time a split or mergeoccurs. The version number is used to identify publications that have used stalecached entries of the distributed partition hierarchy. The publication is rejected ifits provided version number mismatches the latest value. This protects the partitionhierarchy from race conditions.5.5.2 Scalable Pub/SubThe scalable pub/sub component implements Scribe [64]. The hosts form a ringbased on their unique host ids. Each partition serves as a different topic. The basicfunctionality is to subscribe/unsubscribe from different partitions. All events thatoccur in a partition are published to its topic. Subscription requests are routedacross the ring with hops of exponentially decreasing size until they reach eitherthe destination or an intermediate host that already has a subscription to the topic.When an event is published, the publication is forwarded along the reverse path thesubscriptions came through. This will use more hosts to help forward the events,which scales as the topic becomes more popular.5.6 DiscussionOne important ability of SPEX is to decouple publications from subscriptions (i.e.,AoEs from AoIs). In most existing systems [33,56,83], the AoI of a user is tightlycoupled with its position in the VE, i.e., a user publishes its location and its AoI isassumed as the area surrounding its position in the VE. SPEX allows users to fullydecouple these two. A user can publish to any location within the VE, while its AoIcould also be on another region in the VE. This decoupling allows SPEX-based69applications to implement new features previously not possible. For example, auser could be observing a distant location through a telescope. It could also interactwith the distant location by shooting at it with a long-range sniper rifle. We discussSPEX’s specific use cases as follows:(a) Client/Server: SPEX can replace the relay/messaging server in clien-t/server architectures, e.g., FPS games. Instead of connecting to a single server,the players connect to SPEX hosts for interest management. Additionally, server-side arbitrators could be plugged-in to subscribe to regions and handle event pro-cessing based on game logic [86] (e.g., physics). Such an architecture could berun in a dedicated environment to provide epic-scale cloud-based gaming over aWAN [104]. SPEX can also be used for LAN games, e.g., a larger scale virtualcity such as the game Grand Theft Auto, where not all actors are human controlled(Non-Player Characters or NPCs) and their total number exceeds what a singleserver can handle.(b) Peer-to-Peer (P2P): P2P games distribute the VE between peer (i.e., client)machines [91], each acting as a server for parts of the VE. Each peer then hasto find other peers hosting events of their interest (i.e., neighbour discovery pro-cess), which presents a major challenge in P2P gaming. SPEX could be used asan external service to augment existing P2P architectures. Peers can simply useSPEX for neighbour discovery. They will update their states in SPEX at a low fre-quency [55, 104], providing a pointer to themselves (i.e., IP and port), and SPEXwill find their neighbours for them. A peer will then directly contact found neigh-bours for higher frequency updates as it normally would in a purely P2P architec-ture.One could also imagine using SPEX under a hybrid architecture. For exam-ple, some peers behind NATs may have problems allowing other peers to connect.Others might not have enough networking resources to serve peers. In a hybrid ar-chitecture, a peer is given the option to either serve game states itself, only storinga self pointer in SPEX for free, or to host all their states in SPEX and let SPEXserves the peers on their behalf, an option which may require a paid subscription.705.7 EvaluationIn this section we evaluate SPEX. We emphasize that our evaluation is based onan actual implementation of SPEX running on real machines, not a simulation. Wethus obtain results that are more realistic and could be of value to system develop-ers. We use two evaluation platforms. Sections 5.7.1, 5.7.2, 5.7.3, 5.7.5 and 5.7.6use the testbed setting: a set of 22 machines with Pentium IV 3200MHz processorsand 1GB of RAM, connected with Gigabit Ethernet. Section 5.7.4 uses the EC2setting: 60 High-CPU Extra Large Amazon EC2 instances [3], each having 8 vir-tual cores with 2.5 EC2 compute units of processing power and 7GB of memory torepeat some tests at a larger scale with more actors. Half of the machines in eachexperiment are used to host SPEX and the other half act as clients. Each client ma-chine emulates multiple actors, allowing us to evaluate SPEX with a larger numberof actors than the available machines.The purpose of our evaluation is to measure four different aspects of SPEX:• Responsiveness by measuring end-to-end latencies of event delivery.• Scalability by investigating the effects of varying the total number of actorson resource usage.• Correctness by measuring accuracy, a metric obtained by comparingSPEX’s AoI matching to that of an ideal system.• Stability by measuring SPEX’s performance variance over long periods oftime and non-uniform load.We borrow evaluation parameters from one of the most popular large-scaleVEs, Second Life [33]. Actors are randomly spread across the VE with AoIs ofradius 64 units surrounding their position. An actor walks in the VE, updates itsposition and forwards its new position and updated AoI to its associated host. Actorpositions are published as new events using point publications and each actor’ssubscription list is updated as an area subscription. To provide a highly dynamicfast-paced VE, we use FPS-grade update rates for the actors to move and changetheir AoIs, i.e., 10 moves and AoI modifications per second.71Changing the dimensions of the VE allows us to control the density of the ac-tors. In Second Life, a server is assigned a 256× 256 square region which cansupport at most 100 actors [34]; 16 Second Life servers can host a VE of size1024× 1024 and 64 can support 2048× 2048. Using our SPEX hosts, we createVEs with these three sizes, and vary the number of actors in the VE in separateexperimental runs. We investigate two types of actor behaviours, random way-points and cluster movements. In random way-point, an actor selects a randomdestination in the VE and moves toward it, selecting a new one once it arrives atits current destination [83, 104]. In cluster movements, the actors tend to visit cer-tain hotspots more often than other places, causing them to cluster in the hotspotsmore densely [86, 87, 99]. We first start our evaluation with random way-points inSections 5.7.1-5.7.4, then compare results to cluster movements in Section 5.7.5.Each run lasts for 120 seconds [86, 87] and the VE starts as a single parti-tion. We clip out the first 20 seconds to capture 100 seconds of run time, thus wegive SPEX’s load balancer 20 seconds to quickly partition the VE and balance itbetween the hosts. Each run is repeated 5 times to average out random actor place-ment effects. The duration of 120 seconds was selected to allow SPEX to stabilize,while keeping collected data traces within reasonable sizes. Nonetheless, we in-vestigate SPEX’s performance variations over a full hour in Section 5.7.5. Finally,Section 5.7.6 presents results on synchronization overheads of using one-roundtransactions instead of direct message passing.5.7.1 Processing LatencyThe quality of experience (QoE) of actors in a VE is known to be highly sensitiveto end-to-end delay. End-to-end delay is the time between when an actor causesan event until it is observed by others. Previous studies have found upper boundsof 150ms for FPS games on end-to-end delay as the border of enjoyability of thegame. Anything above that starts to drastically degrade QoE [67]. In a breakdownof end-to-end delay, a message has to propagate from the user machine to a SPEXproxy host (network delay), spend some time inside SPEX being duplicated andforwarded to other proxy hosts (processing delay), and another network delay backto other user machines.72 0 10 20 30 40 50 60 70 0  100  200  300  400  500Latency (ms)Number of ActorsMap Size:   256Map Size:  1024Map Size:  2048Figure 5.4: 99th percentile SPEX processing delays in the testbed.We measure end-to-end delay as the time it takes for an event to leave one usermachine and arrive at another, and measure SPEX processing delay as the time ittakes for one SPEX host to internally deliver a publication to others. Together,both measurements allow us to quantify how the actors observe events in the VE,and how much of that is due to the distributed architecture of SPEX. If SPEX pro-cessing delay is low enough, it leaves enough time for network delays while stilladhering to the bound on end-to-end delay. To obtain these measurements, we syn-chronize the clocks on the evaluation machines using NTP. Each event messagecontains two timestamps. One corresponds to the time it leaves the user machineand one is when it arrives at SPEX. The timestamps are used to calculate a mes-sage’s delays. We present two characteristics of delay. Mean represents the meandelay computed for all messages in a run. We also compute an upper bound onthe 99th percentile delay, measured as the 99th percentile of the maximum delaysmeasured in 1 second intervals at each machine. The error bars represent minimumand maximum measurements across the runs of each setting.Figure 5.4 presents our latency results for different VE sizes and different num-bers of actors. Mean SPEX processing delays (not shown) are well under 4ms andthe 99th percentile is an order of magnitude larger but still less than 50ms. As the73number of actors in the VE increases, the average number of actors within the AoIof any actor increases. This puts more load on the hosts which increases processingdelay.Our latency results are significant in three important ways. First, SPEX’s shortmean delay allows us to use it for any type of application, even epic-scale FPSgames, as it leaves ample time for network delays. Second, by using more hosts,SPEX can scale a single Second Life region to host more actors, enabling morecongested areas. An interesting extension is for Second Life to keep its currentpartitioning method (i.e., by regular grids), but assign each partition to a separateSPEX instance rather than a single core, allowing a set of SPEX hosts to handle theinterest management for a single Second Life region. Third, to host a VE of size2048× 2048 in Second Life, the operators would normally have to use 64 cores.In SPEX, the number of hosts can dynamically scale from 1 to 64 to match currentdemands. Thus, an operator can balance system cost and performance with SPEX.The per-second update frequency of each actor directly impacts performance.More updates implies that actors publish and change AoIs more frequently, puttingmore stress on the system to deliver more events and change subscriptions morefrequently. We thus investigate the effects of update rates on maximum SPEX pro-cessing delays with a map size of 1024×1024 in Figure 5.5. Increasing the updaterate increases the 99th percentile worst delay at larger scales. With 20 updates persecond, average worst case delays are close to 100ms with a mean below 3ms. Thehigher update rates push the host CPU to be fully saturated, increasing delay. Weexpect SPEX to perform better if more hosts were available.5.7.2 BandwidthPhysical resources available on each machine are limited. We need to ensure that ahost’s bandwidth usage does not exceed what is available. We measure bandwidthas application throughput sent to and received by the machines. For an actor, itsimply consists of the data stream uploaded to and downloaded from its associatedSPEX host. A SPEX host’s bandwidth consists of data it communicates with all itsactors and the data it communicates with other SPEX hosts.Figure 5.6a and 5.6b illustrate the average bandwidth measurements for each74 0 20 40 60 80 100 120 140 0  50  100  150  200  250  300  350  400Latency (ms)Number of Actors5 Updates/s10 Updates/s15 Updates/s20 Updates/sFigure 5.5: 99th percentile SPEX processing delay in the testbed with differ-ent update rates at map size 1024× Bandwidth grows linearly with the number of actors, since the VE’s statesgrow with the number of actors, and more events have to be delivered to more tar-gets. This shows that SPEX’s interest management has reduced bandwidth growthfrom quadratic to linear. In a more congested VE, an actor is subscribed to moreevents than in an uncongested VE. As evident in Figure 5.6c which presents aver-age actor download bandwidth, with a smaller VE the number of events deliveredto an actor is significantly larger than that of larger VEs. However, the requiredbandwidth for the actors is within the range available to residential users. As forthe hosts, part of their traffic traverses the internal network (LAN). The rest isstreamed over the Internet, but does not exceed bandwidth available to commercialservices.5.7.3 AccuracyIn this section we quantify how close the correctness of SPEX is to a hypotheticalideal system to ensure that SPEX does not trade correctness for performance. Inan ideal system, all events forwarded to an actor are correctly matched to its AoI.SPEX is a fully distributed system and various delays in the system could cause75 0 2 4 6 8 10 12 14 16 18 0  100  200  300  400  500Bandwidth (Mbps)Number of ActorsMap Size:   256Map Size:  1024Map Size:  2048(a) Host upload 0 2 4 6 8 10 12 14 16 18 0  100  200  300  400  500Bandwidth (Mbps)Number of ActorsMap Size:   256Map Size:  1024Map Size:  2048(b) Host download 0 50 100 150 200 250 0  100  200  300  400  500Bandwidth (Kbps)Number of ActorsMap Size:   256Map Size:  1024Map Size:  2048(c) Actor downloadFigure 5.6: SPEX host and actor average bandwidth in the testbed.76forwarded events to sometimes mismatch the latest AoI. We define accuracy asthe ratio of the number of correct events the actors receive divided by total eventsthey receive. A correct event is defined to be one that would be delivered by ahypothetical ideal communication system that operates with zero delay. Eventsthat were delivered but should not have been, and events that should have beendelivered but were not, count as incorrect events. Accuracy is the only way tomeasure the correctness of a distributed architecture to ensure no events are lost orincorrectly disseminated in the system.There are two sources that contribute to inaccuracy in SPEX: event processingdelay and subscription delay. With processing delay, an event arrives at one hostand spends some processing time going through various components before beingdelivered to other hosts. The longer it takes for the event to reach its destination,the higher the likelihood that the target actors will possibly change their AoIs.Thus the event delivered in an ideal system might not be delivered in a system withprocessing delay. When an actor makes a movement and updates its AoI, its hostimmediately subscribes to topics that are in the new AoI and unsubscribes fromtopics that are no longer in the actor’s AoI. In practice, there is a delay for thepub/sub component to fix the reverse forwarding tree to include/exclude the host,something that would happen immediately in an ideal system. This is what werefer to as subscription delay. The longer it takes for subscriptions to be updated,the higher the likelihood that the actor might receive events from its old AoI butare not in its current AoI, or miss events that occurred in the new AoI before thenew subscriptions take effect.We compute server-side accuracy offline using two different timestamped logsstored by the hosts per actor: position log, the actor’s position in the VE, and anevent log, any event that was ever delivered to the actor. Using the position logwe fully reconstruct the global state of the VE at any given time. We use thisstate to find the ideal set of events each actor should have received (i.e., each actorshould receive relevant events within 100ms). By comparing this ideal matching tothe event log we identify any missing or incorrectly forwarded events (i.e., eventsthat should not have been received or were not received) and compute accuracy.SPEX is streaming hundreds of megabits of data per second, and for accuracy wehave to log every event for the entire experiment duration, which severely affects77 90 92 94 96 98 100 0  50  100  150  200  250  300Accuracy (%)Number of ActorsMap Size:   256Map Size:  1024Map Size:  2056Figure 5.7: SPEX accuracy in the testbed.performance. So we only compute accuracy up to 275 actors.Figure 5.7 illustrates the accuracy of SPEX. For all VE sizes and all numbersof actors accuracy is over 90%. When the VE is large enough SPEX is over 96%accurate. Only in the extremely small VE of 256× 256 does accuracy drop to91%. Recall from Figure 5.4 that processing delay starts to increase in this VE size.Accuracy highly depends on system delays, as confirmed by our results. Previousstudies have shown FPS games can cope with loss rates of 5%–10% as long asevents are delivered within 100-150ms [53]. Compared to studies on FPS gamesand to related work [55, 86], we consider SPEX’s accuracy to be high enough.5.7.4 Cross-Continent LatencyWe now report the experiments in EC2 using almost 3 times more SPEX hosts andclient machines to test SPEX for VE scales never previously tested. SPEX hostsare located in Amazon’s US-EAST region (Virginia), while the client machinesare located over 4000 kilometers away from the hosts in the US-WEST region(California). We found the base round-trip delay between the clients and the hostsusing ping to be around 85ms. This distance allows us to capture the effects ofcommunicating across a continent on end-to-end latency. Using 60 EC2 instancesfor multiple hours is costly, so only a VE of size 1024×1024 is used to re-run the78 0 50 100 150 200 250 300 0  100  200  300  400  500  600  700  800Latency (ms)Number of Actorsmedian90th95th99thFigure 5.8: End-to-end delay across the continent in EC2.experiments in Section 5.7.1 with an average of 2 runs.At the largest scale of 750 actors, mean end-to-end delays are under 91ms,enough to host an epic-scale FPS game. Note that there is a base 85ms delay froma client to a host and back to another client, meaning update messages spend anaverage of less than 6ms inside SPEX. Figure 5.8 illustrates various end-to-enddelay percentiles of the worst case delay. Parts of these worst-case delays are dueto over crowded regions in the VE, where the event is being forwarded betweenthe hosts inside SPEX. The rest are due to unpredictable events in the Internet,e.g., TCP retransmits.1 For example, with 750 actors, the 95th percentile of worstcase delay is around 215ms. From an actor’s perspective, this implies that in 100measured intervals, events over 215ms old are received only in 5 of these 100intervals, and in the other 95 intervals all events are on time. From the host’sperspective, every second, out of 750 actors, only 37.5 actors (i.e., 5% of them)receive events older than 215ms. Based on our EC2 setup, the distance betweenthe hosts and clients is large enough to cover North and Central America, and evenparts of South America. Users in these regions should expect a quality of servicethat is at least as good as what we present. More distant clients would observe an1All streams between SPEX hosts themselves and their clients use TCP to be safe to publicnetworks, including EC2 and the Internet, respectively.79added delay (due to distance) compared to our results.5.7.5 StabilityAll our experiments thus far started with a single un-partitioned VE and pushedSPEX’s load balancer to quickly repartition the VE and distribute it between hosts.In this section we measure the robustness of the overall architecture, and validateour experiment settings to not bias in favour of SPEX. We would like to ensurethat our experiments have fully and correctly captured SPEX in its steady stateoperation.We start with 275 actors in the testbed setting, the largest setting possible tofully log the system states from Section 5.7.3, and re-run all experiments for afull hour to measure variance in SPEX for all world sizes. If we observe signif-icant variance in the measurements over time, our experiments would have failedto capture the steady state characteristics of SPEX. Since the hosts are streaminghundreds of megabits of data per second, it is not possible to log everything for anhour. Hence, we periodically measure the system’s behaviour. For latency, every2 minutes we compute SPEX’s mean and max processing delay as the average andmaximum processing delay of all updates within the last 2 minutes, respectively.For accuracy, every 400 seconds we log all required information for 30 consecutiveseconds, thus sampling SPEX’s accuracy approximately every 7 minutes. As Fig-ure 5.9 shows, we found performance samples to be consistent over time with nosignificant changes. Within one hour, mean delay samples varied less than 0.5msand accuracy less than 1%. Our results here confirm that evaluations in previoussubsections have correctly quantified SPEX’s performance.As a last step to validate our experimental setup, we investigate the effects ofactors clustering in hotspots. In the 256× 256 world size (already a congestedarea), we setup 1, 4 and 8 hotspots to represent locations in the VE where actorstend to investigate more often. This will cause regions around the hotspots tohave a higher density of actors. With 1 hotspot, an actor is either at the hotspotand goes to another random location, or is not at the hotspot and elects to go tothe hotspot 50% of the time. With 4 and 8 hotspots spread in the VE, an actorelects to go to a randomly selected hotspot with high probability (80%), otherwise80it goes to a random location [86]. We ran the experiments for a full hour. Asshown by Figure 5.10, with 1, 4 and 8 hotspots we found the mean delay to rise upto 7-8ms. Maximum delay still varied mostly between 40ms-60ms and accuracywas still maintained at around 91-92%. Hence SPEX’s load balancer successfullymanaged actors clustering in the VE. We believe this clustering is one of the worstloads actors can impose on a VE. We expect SPEX to behave under other typesof clustering (e.g., more/less hotspots, larger world size, etc.) with a responsesomewhat between that of uniform distribution and a single hotspot. Our results inthis section validate the duration and load of our experiments, where we observeno significant change in delay or accuracy over time or over non-uniform load.5.7.6 Synchronization CostsIn our evaluation we have measured SPEX’s latency under a workload where theactors produce point publications and have area subscriptions. We did not yet in-vestigate area publications. This is because defining area publications, how theyrelate to the VE and the effects they might have on other actors is extremely ap-plication specific and cannot be easily mimicked. An area publication, dependingon its size, may result in a transaction that spans multiple partitions. Since thepartitions could reside on different hosts, the transaction could touch several hosts.Hence, in this section we quantify the effects of using one-round transactions forsynchronizing generic events that span multiple servers. We wish to measure thebasic overhead of using the transaction’s commit protocol to ensure global seri-alization. Comparison is done to the alternative of simply sending messages toservers in the form of one-to-many communications without any predictable order-ing. We leave evaluating synchronization overheads with meaningful area publica-tions as future work.In the testbed setting, 11 machines act as servers while 5 client machines issuean aggregate of 1000 transactions per second. We vary the number of servers eachtransaction contacts from 1 (a normal message) to 11 (an event that effects parti-tions on all servers). Note that 11 represents extreme all-to-all communications.Each transaction delivers a simple 8-byte timestamp generated at commit time toits target hosts. It is designed to be small to not IO saturate the machines. A host81 0 1 2 3 4 5 6 7 0  500  1000  1500  2000  2500  3000  3500Latency (ms)Time (sec)Map Size:   256Map Size:  1024Map Size:  2048(a) Mean processing delay 0 10 20 30 40 50 60 70 0  500  1000  1500  2000  2500  3000  3500Latency (ms)Time (sec)Map Size:   256Map Size:  1024Map Size:  2048(b) Max processing delay 90 92 94 96 98 100 0  500  1000  1500  2000  2500  3000  3500Accuracy (%)Time (sec)Map Size:   256Map Size:  1024Map Size:  2048(c) AccuracyFigure 5.9: SPEX performance over period of 1 hour of run time with randomwaypoints.82 0 1 2 3 4 5 6 7 8 9 0  500  1000  1500  2000  2500  3000  3500Latency (ms)Time (sec)1 Hotspots 4 Hotspots 8 Hotspots(a) Mean processing delay 0 20 40 60 80 100 0  500  1000  1500  2000  2500  3000  3500Latency (ms)Time (sec)1 Hotspots 4 Hotspots 8 Hotspots(b) Max processing delay 90 92 94 96 98 100 0  500  1000  1500  2000  2500  3000  3500Accuracy (%)Time (sec)1 Hotspots 4 Hotspots 8 Hotspots(c) AccuracyFigure 5.10: SPEX performance over 1 hour of run time with VE size of256x256 and different numbers of hotspots.83 0 1 2 3 4 5 6 7 1  2  3  4  5  6  7  8  9  10  11Latency (ms)Number of Contacted ServersTCPTransactionFigure 5.11: Cost of using one-round transactions for global synchronizationinstead of simple message passing.computes the one-way delay it took for each timestamp it receives to get from theclient to it, be correctly serialized according to the commit protocol, and processedby the server. We compare this delay to when the client directly sends the messageto all servers. Isolation mode was set to clock mode which performs well underheavy contention. Results are an average of 3 runs each.The results in Figure 5.11 show with 1 server, i.e., point publications or areapublications that fit in a single partition, there is no distinguishable difference be-tween using transactions or direct message passing. The commit protocol is op-timized to only take a single RTT to complete for single server transactions. Thenumber of messages that have to be delivered increases linearly with the numberof servers. As a result so does latency. For higher number of contacted servers thecommit protocol uses 2 RTTs. The gap between message passing and transactionsincreases with the number of servers but never exceeds a factor of 2x. Hence, thelower the RTT between the hosts, the lower SPEX processing delay will be. Wespeculate SPEX to have a higher processing delay in the presence of area publica-tions than what we measured in Section 5.7.1. But as we observed in this section,the difference will be in the order of a few extra milliseconds. SPEX will live upto its promise of unrestricted area publications.845.8 Related Work5.8.1 Commercial DeploymentsSuccessful commercial systems currently exist that support large-scale VEs. Sec-ond Life [33] uses partitioning to assign each square region of size 256×256 to aspecific core [34]. Each region can handle at most 100 actors and actors can onlyinfluence the current region they reside in. World of Warcraft [46] uses shardingto separately run multiple parallel instances of the VE [99]. Users are assigned toa specific shard and cannot migrate across them, nor can they interact with othershards. EveOnline [17] supports numerous players in a single VE. It uses a so-lar system architecture [1], but at its heart is a single centralized SQL database.Pikkotekk [27] has the record of supporting 999 players in single FPS game. Agroup of cell servers handle different cells (i.e., partitions) of the VE. Messagepassing is used to communicate between the cells. However, all users connect to acentral Pikko Server which act as a mediator between users and cell servers. Whileit supports area effects, distributed consistency is completely ignored. SPEX uses afully distributed architecture. Users do not have interaction limitations, nor is thereany central component. SPEX supports area publications and explicitly deals withconsistency.5.8.2 Query/Broadcast-basedOur work focuses on the support for low-latency interactions such as FPS games.Colyseus [56] and Donnybrook [55] also aim to support scalable FPS. Their ap-proaches, as well as the later cloud-based architecture by Tayarani et. al. [104],use two classes of update rates for interest management, one low frequency posi-tion update (or spatial query for Colyseus) is used for neighbour discovery, and ahigh frequency update via direct connections is used for actual interactions. Don-nybrook has demonstrated up to 900 players via simulations, while Tayarani et al.have shown up to 320 players interacting in a cloud environment. However, the lowfrequency update is broadcast globally, which imposes an inherent upper bound onscalability. The query time for neighbour discovery in Colyseus increases loga-rithmically with the number of actors, so the system will cease to satisfy real-time85requirements beyond a certain scale. SPEX uses progressive partitioning, thus ahost only needs to know about a bounded number of partitions, regardless of thetotal number of actors. SPEX is also practically deployed.5.8.3 PartitioningSpatial partitioning based systems such as N-Tree [83] and OPeN [77] both uti-lize quad-tree to dynamically partition the VE. N-Tree targets to support spatialmulticast (i.e., publication), and OPeN focuses on spatial query (i.e., subscription).While both also provide dynamic partitioning, they do not support the combina-tion of both publication and subscription as in SPS, and both have been mainlyevaluated by simulations or analysis [83]. SPEX supports both publication andsubscription, with the more general primitive of SPS, and its feasibility is demon-strated using actual experiments. Matrix [93] uses dynamic partitioning to dis-tribute a VE between multiple hosts. However, it uses a centralized server to holdthe partition mapping which is updated upon each modification, signalling a bot-tleneck and single point of failure. In contrast, SPEX uses a scalable and fullydistributed partition hierarchy.Previous attempts have been made to use spatial partitioning in conjunctionwith scalable pub/sub, e.g., SimMud [91] assigns an application-layer multicastchannel (based on Scribe) to each rectangular region. However, SimMud uses fullstatic partitioning and has no load balancing. SPEX uses progressive partitioningwhile supporting adaptive load balancing.5.8.4 SPSS-VON [87] and VSO [86] are two recent proposals supporting SPS based onVoronoi partitioning. S-VON enhances a Voronoi-based Overlay Network (VON),which provides spatial subscription, with the additional function of spatial publica-tion. However, it has no built-in mechanism for dynamic load balancing. VSO pro-vides SPS based on a super-peer design, while dynamically adjusting the Voronoiregions to balance load. SPEX differs from VSO mainly in two aspects: 1) the par-titioning is based on a hierarchy as opposed to flat Voronoi; 2) a distributed trans-action provider is used to maintain consistency and reliability guarantees. While86VSO has demonstrated up to 1000 entities under balanced loads, both S-VON andVSO have only been verified by simulations.The Data Distribution Management (DDM) [102] mechanism from the HLAstandard is an earlier form of SPS. Initial works use only a server to perform thetask of interest management, while subsequent works use grid-based partitioningand dynamic multicast address assignment to scale up the DDM operations [59].DDM for discrete-time simulations (i.e., non real-time simulations where logi-cal clocks of entities may advance at different time-scales) has also been intro-duced [115]. However, having a limited number of IP-based multicast addresses,as well as the fixed-size nature of grid-based partitioning, are classical bottlenecksto scalability. SPEX can be seen as providing both dynamic partitioning where thenumber of partitions is proportional to the user size, while the multicast channelsare much more numerous, as they are logical and provided at the application-layer.5.9 Conclusions and Future WorkWe present SPEX, a system capable of supporting a large number of concurrentusers in a VE, without common limitations, e.g., density cap or limited interac-tions. The system can balance scalability and operation costs by dynamically ad-justing the pool of SPEX hosts. SPEX decouples publications from subscriptions,allowing users to publish to locations outside their current AoI, i.e., long-rangeinteractions. It uses a fully distributed architecture with no components requiredto maintain global state, but allowing information to efficiently be located usinga distributed hierarchy. Our contributions include: 1) enable long-range interac-tions with spatial pub/sub (SPS) based on a distributed transaction provider; 2)adaptive load balancing using a combination of progressive partitioning and scal-able pub/sub to address the user density issue; and 3) evaluating the practicalityof SPEX at large-scale both in a testbed and over the Internet with a real system.Our evaluation shows that SPEX scales to 750 actors and provides low end-to-endlatencies and high accuracy. SPEX thus is an architecture suitable for hosting thenext generation of on-line games: epic-scale FPS games.SPEX focuses on VE distribution, interest management and message dissemi-nation. One-round transactions provide the means to correctly synchronize events87spanning multiple servers, but we did not extensively deal with conflicts occurringbetween users, which is important to support more sophisticated game states andlogic. Our future work is thus to create a complimentary component to simplifythe dealing of user conflicts. SPEX takes user events and translates them to SPSfunctions. It creates per-user interest sets and AoEs. A future system could use thisinformation and distributed transactions to deal with user interactions and conflicts.By using multiple transactions per user to over estimate users’ range of effects, wecan then deterministically resolve conflicts in a fair manner.88Chapter 6iEngine6.1 IntroductionRelational Database Management Systems (RDBMSs) have for decades served asan integral part of computing environments and are the fundamental componentfor data storage in almost all industrial and commercial environments. Enterprisesrely on RDBMSs for data storage, management and analysis due to certain keyfeatures. An RDBMS provides a high degree of expressiveness, sufficiently simpledata model, sound mathematical foundation, and widely accepted standards.Data is made available through the Structured Query Language (SQL) as theuniversal language. SQL is simple, yet general, and simplifies interactions withthe data as it allows a very broad set of possible questions to be asked [45]. SQLis well understood and has enabled an ecosystem of management and operatortools to help design, monitor, inspect, explore, and build applications. Addition-ally, SQL is system agnostic. The application doesn’t need to know the details ofhow the RDBMS is implemented. A SQL programmer can reuse their API and UIknowledge across multiple back-end systems. This reduces application develop-ment time with increased code quality. It appears RDBMSs are more likely to playtheir current role for the foreseeable future [45].Traditional relational databases, including those from Oracle and IBM, are builtwith a share-everything centralized topology. They scale up their throughput byadding more expensive hardware with immense licensing costs. With the emer-89gence of cloud computing and hosted services, Database-as-a-Service (DBaaS) issought as an attractive new model of systems deployment, and an alternative toexpensive centralized solutions.DBaaS provides the luxury of a DBMS in the form of a service hosted in acloud environment [4, 19]. Service administration is mostly taken care of by thecloud provider, which has to deal with configurations, failures and outages. Re-sources are dynamically provisioned based on demand, inherently providing scale-in and scale-out as required. Most importantly, the application is exposed to avery appealing pay-per-use cost model. It only pays for what it uses without beingforced to predict (and pay for) what it might use.The majority of DBaas offerings have not yet been able to provide one ofthe most crucial features of hosted services: scaling beyond one computing node.Amazon RDS [4] provides the feature of scaling out into a larger machine whendemand increases. However, it can only go as far as the most powerful computingnode Amazon has to offer, and not beyond [120].Over the past years, large-scale applications have faced a steep growth inuser demand for applications across the spectrum. In response, the scalability ofRDBMSs has become an active area of research. The objective is to provide theability to scale both in terms of storage capacity and query processing power.Partitioning [29, 90] is a widely employed technique used to tackle scalability,where the DBMS splits the data into smaller portions and indexing and isolation arehandled independently and locally to each partition. Work in this area still continueto find ways to efficiently partition data to reduce costly distributed transactions[73, 74, 100], i.e., operations that cross partition boundaries. This requires a-prioriknowledge of all queries that may ever be executed in the system. With currentcomplex data querying requirements of applications, it is often hard to producea good partitioning scheme that both helps scalability and reduces the number ofdistributed transactions.On the other hand, scalability has been achieved by shifting towards more re-laxed data consistency models than strict ACID compliance, and through data ac-cess primitives far less expressible than SQL [65,76]. The new abstraction has beenkey to the success of some systems, but the weak consistency data model gives thedevelopers a hard time trying to develop stable, error-free systems [51]. Most of the90heavy lifting done in the DBMS is pushed up into the application adding unwantedcomplexity. Also, the narrow data access primitives suffice to only serve specifictypes of applications and come no where close to the generality of SQL. This hasresulted in a general lack of interest from enterprises in the NoSQL paradigm [113].While our ultimate goal is to implement a highly scalable truly elastic RDBMS,the most important question we seek to answer is whether we can build a fullySQL-compliant RDBMS without having to make compromises? How can anRDBMS simultaneously provide scalability, consistency and generality?In current RDBMS architectures, from the query planning module to the stor-age module, all the components are tightly coupled with each other in a single nodesetup. Commercial RDBMS engines are highly sophisticated and optimized witha solid mathematical foundation. Re-designing database concepts in a distributedfashion is not a trivial task by any means. Instead, we take a different approach andconsider how multiple RDBMS engines can operate on the same data set simulta-neously, without having to know about each other.We present iEngine, a memory-resident distributed database storage engine.Data is stored in memory for fast access and logged to disk for durability. iEngineis aimed to scale in both dimensions of storage throughput and query processingpower without having to sacrifice any parts of SQL. An unmodified DBMS engineis built over a unified distributed storage layer which is capable of storing andindexing data and isolating concurrent operations from each other. The distributedstorage can scale its throughput and storage capacity by adding more nodes to eachof the components used for storage, indexing or isolation. Query processing powerscales by adding more DBMS engines to the system.The storage layer uses partitioning which is done internally and automatically.DBMS transactions are then implemented using multiple one-round transactions,i.e., high-performance short-lived transactions which use two-phase commit to ex-ecute. They provide the means necessary for a strong consistency data model.Concurrent transactions are isolated from each other using a distributed isolationcomponent capable of locking arbitrary ranges of the data.Our evaluation of iEngine show it to be a viable solution for DBaaS. It scalesboth for simple web based operations (lookups and inserts) and in the presence ofcomplex range operations, and outperforms a single-node DBMS engine (InnoDB).91Figure 6.1: Conventional workflow in a RDBMS.iEngine is a general purpose storage engine compatible with modern DBMSengines. It architecturally supports being simultaneously used by different engines.This provides the application the freedom to choose the most suitable engine foreach task at hand.6.2 Current RDBMS ArchitecturesIn RDBMSs, data is stored in different tables. Each row of the table is a sepa-rate data item and can have multiple different attributes, each stored as a separatecolumn in its associated row. Tables are created, destroyed, and populated withitems all through SQL queries specified by the user. Transactions can be used toaggregate different queries into an atomic set.Figure 6.1 shows the common components in a conventional DBMS architec-ture. The components can broadly be categorized into ones used for SQL process-ing and those used for storage.926.2.1 Query ProcessingWhen a query arrives, it is first parsed and checked for correct SQL semantics.Then an optimum execution strategy is devised by the SQL processing layer. Thisis the layer that provides the rich querying features in the DBMS.The optimizer uses meta values obtained from the underlying storage as mea-surements. Examples of meta values are size of the index, average scan time, indexlookup time and delete times for a given table. It then uses these measurements tocome up with an efficient plan in order to return the data as quickly as possible tothe user. Numerous factors contribute on how the execution is planned. Factorssuch as the number of rows that have to be fetched from storage, whether to usethe index or to resort to a table scan, and which key would require fetching a fewernumber of rows are taken into consideration.6.2.2 Storing & IndexingOnce an execution plan is decided, data is fetched from the storage layer. One wayto obtain data from a table is to check each row of the table against the query todetermine if it should be part of the result. A task known as a table scan. Tablescans are expensive and don’t scale as the data set grows. An alternate way is touse the index.An index is a data structure that improves the speed of data retrieval operationson a database table at the cost of additional writes. Hash tables and B-trees arecommon structures used for indexing. The index uses more storage space to main-tain an extra copy of data as items are added to, modified, and deleted from thedata set. Indexes can be created using one or more columns of a database table,providing the basis for both rapid random lookups and efficient access of orderedrecords [14].The query specifies how the data should be retrieved. The information in thequery is presented in the form of constraints, such as items with specific values,items with values within a specific range, or items with any values. These willresult in an index lookup, index range query, and table scan respectively. Once thedata has been located (either via the index or from the table directly), it is read fromdata storage and the results are passed back to the query processing layer.936.2.3 IsolationA DBMS allows multiple transactions to execute concurrently for increasedthroughput. Consequently, different transactions should be isolated from eachother when required. Locking is the primary technique used for isolation inRDBMSs. Locking is used for hiding the transient state of one transaction fromothers. As a transaction executes, the rows of the tables it touches are locked toensure no other transaction can modify them until the transaction finishes. Basedon the details of what to lock and when to release them, four different isolationmodes are possible:• Read Uncommitted There is no isolation among transactions. Any transac-tion can see intermediate results of other transactions. In terms of locking,read/write locks on items and ranges are only held during the execution ofthe current statement and not the entire course of the transaction.• Read Committed Transactions are only allowed to see values once they arecommitted by others. A transaction can see values committed after it startedexecuting. Write locks on items are held for the duration of the transactionwhile read locks and range locks are held for the current executing statement.• Repeatable Reads Transactions can see only values committed before theystart. Read/write locks on items are held for the duration of the transactionand range locks are held for the current executing statement.• Serializable As if all transaction were executed in a global serial order. Allread/write locks and range locks are held for the duration of the transaction.Serializable isolation provides the strongest form of consistency, and is thehardest to provide. It is the only form of isolation which requires complex rangelocks. A range lock not only locks items that currently exist in a database fallingwithin the given range, but also should prevent future items from being insertedinto that range until the range lock is released. Hence, simple item locks will notsuffice. This is to ensure that a phenomenon known as Phantom reads will neveroccur.946.2.4 Distribution ChallengesNon-distributed RDBMSs follow a monolithic and shared-everything architecturewith a tight coupling between the components. The query processing componentsare highly sophisticated and based on a solid mathematical foundation. Findingdistributed alternatives to database concepts is by far a non-trivial task.In the storage layer, indexing and locking are two important components. Theindexing system is expected to be able to answer complex queries over the itemsit stores, such as finding items with specific values on selected attributes. Sincealmost every operation needs to either read from or write to the index, the index isan important potential point of contention.The locking mechanism is required to lock both individual items or all rangesof items that have attribute values in specific ranges. The need for efficiently han-dling complex range locking renders most available scalable locking systems inef-ficient for direct use in a distributed RDBMS (see Section 6.8.3).Finally, the RDBMS heavily relies on the strong consistency of both of thesecomponents, which presents a major challenge for scaling the storage layer of asingle-node RDBMS.6.3 iEngineFigure 6.2 presents the general architecture of iEngine. We take a commodity-off-the-shelf RDBMS engine and scale it out horizontally by substituting its single-node storage with a distributed storage layer, leaving the engine intact. Sincethe engines are unmodified, iEngine provides the same level of generality as atraditional RDBMS. Similar to a single-node setup, an engine is given the illusionthat it’s the only one running in the system and granted access to all of the data.In reality, numerous engines are allowed to coexist simultaneously by sharing thesame storage layer.The interface between an RDBMS engine and the storage layer masks out theunderlying distributed nature and correctly isolates the engines from each other.Data storage, indexing and locking are the core functionalities of the distributedstorage layer, where each component can be scaled independently from the oth-ers. With such a loose coupling between query processing and storage, iEngine95Figure 6.2: General architecture of iEngine.can scale in two dimensions. In the first dimension, the storage can transparentlyscale to provide more storage capacity and operation throughput. In the seconddimension, the number of RDBMS engines interacting with the storage may scaleto accommodate more query processing power and handle higher transaction rates.The distributed storage layer can be broken down into three separate com-ponents used for storing data items, indexing the data and isolating transactionsthrough locking. Each component scales independently to provide the ability toaccommodate different workloads. For example, a rarely modified data set onlyrequires the storage component to scale, while a ready-only query-heavy workloadwould require more indexing power, and a write-heavy workload with long-livedtransactions would need scalable locking performance.Figure 6.3 illustrates the different components of the distributed storage layer.Theoretically, the separate blocks can reside on different physical machines. Thestorage handler is the binding between the query processing unit of an RDBMSengine with distributed storage layer. It communicates with the front-end of thestorage layer which is the transaction handler.The transaction handler is the only component in the system that knows about96Figure 6.3: Distributed storage layer architecture.SQL transactions and queries, and translates them into the appropriate set ofsmaller operations. The smaller operations use a key/value style, where each oper-ation is assigned a globally unique key. This ensures dependencies between trans-actions touching the same data are correctly detected and isolated. The transactionhandler uses one-round transactions to convey the key/value operations to the back-end servers that store, index and lock the data.The back-end servers serve requests following the one-round transaction com-mit protocol. They use disks to synchronously log the data for durability. Othercomponents ensure enough data is stored inside the servers to provide crash recov-erability from failures of any part of the system.6.3.1 Query ProcessingWe use MySQL [28] as the query processing RDBMS for our prototype imple-mentation of iEngine. MySQL has wide spread usage across different domains,and is used as a commercially hosted database service [4,19]. As illustrated in Fig-97Figure 6.4: MySQL architecture.ure 6.4, it provides a storage plug-in interface. The core components of MySQLtranslate transactions into storage operations working on a record basis. By ex-tending the plug-in interface, a custom-built storage layer can easily be integratedinto the RDBMS. Additionally, the interface can choose the level of consistency itprovides.A user can connect to any instance of MySQL to have full access to the entiredata. Existing tools, e.g., MySQL Proxy, can be directly used to assign users toengines to balance load based on congestion. MySQL uses multiple threads to dealwith user requests. Each thread is assigned its own storage handler instance whichindependently talks to storage. Thus user threads don’t share state with each otherand avoid expensive synchronization overheads between themselves.6.3.2 Storage HandlerThe SQL operations understood by the storage handler are those dealing with ta-ble creation/deletion, ones that define transaction boundaries (Start, Commit, Roll-back), and the standard CRUD operations (Create, Retrieve, Update, Delete). Thestorage handler is responsible for mapping SQL based queries to commands un-derstood by the storage layer. Table manipulation and transaction definitions havea simple translation to storage commands. The CRUD operations need some pro-98cessing to compute their respective key/value based operations.Each operation’s key is computed based on the table’s name and informationprovided by the engine. Using the schema of a table and a given query, the storagehandler checks to see if exact values on all attributes have been provided. If so, thequery operates on an individual item. If not, the query is operating on a range. Theoperation’s key, type and additional information are sent to the transaction handler.For individual items, the key is computed as a concatenation of all the at-tributes in the table’s namespace. For ranges, based on the data types specifiedin the schema, the item’s key is computed as a range key. A range key consists oftwo different keys, one specifying the minimum possible attribute values and onespecifying the maximum.1Storage handlers don’t have a strict affinity to a specific transaction handlerand are assigned to them in a round-robin fashion. For performance reasons, wetry to run a transaction handler on the local machine to each MySQL engine totake advantage of faster communications. Each storage handler instance makes aseparate connection to the transaction handler which uses blocking IO to preventthe engine from issuing requests while the transaction handler is trying to keep up.To reduce extra communications with storage, the storage handler caches re-sults returned from the previous requests to be used by subsequent requests on theexact same data. Caching only happens for the duration of a transaction and whenthe isolation level can ensure cached data may not have been modified by othertransactions. Cache items are modified according to operations performed by atransaction to ensure they reflect the latest results on subsequent requests.The storage handler interface is generic enough to support other RDBMS en-gines. This will allow a heterogeneous mixture of engines to operate on the samedata, each specialized in a different specific task.6.3.3 Transaction HandlerThe transaction handler is the front-end binding point to the distributed storagelayer. The main duties of the transaction handler are to store and index data, whilealso acquiring the required ranges or item locks. It is the only part of the storage1A key can be thought of as a range key where the minimum and maximum are the same.99layer that knows about transaction boundaries. All lower layers beyond this pointonly understand individual key/value operations.Commands arriving from the storage handlers are transformed into one or moreseparate operations. Some are executed as they arrive, e.g., searching for specificdata items, while others are postponed until the transaction is committed, e.g.,modifying stored data.Data is stored and indexed inside Innesto, a distributed strongly consistent key/-value datastore. Innesto uses spatial partitioning to support search on multipledifferent attributes and provides consistency by using one-round transactions. Amodified version of Innesto is also used for range locking.When a table is created, two separate instances of Innesto are created. Oneis used for indexing and storing data items, and one is used for logical locking ofitems and ranges of the table (lock manager). In contrast to mandatory lockingwhich is usually hardware enforced, logical locking is a higher level mechanismwhere the existence of a data item within a data structure is interpreted as the givenitem being locked.A table could have any number of rows, where only a subset of them are in-dexed. For example, a table may have ten different columns but only decide toindex three of them. This means queries that provide constraints on values of thethree indexed columns will complete by using the index, while ones that use thenon-indexed columns have to perform a table scan. As a result, in this example twothree-dimensional Innesto instances are created.Data items are stored by inserting them inside the index. To lock an item, it’skey is inserted inside the lock manager, and removed for unlocking. Ranges arelocked in a similar fashion, where the range key is inserted and removed to lockand unlock the desired range.6.3.4 CorrectnessTo verify correctness of the overall system, we classify correctness into two cat-egories: data integrity and data correctness. High level RDBMS logic, such asexecuting transactions, is conducted by commercial RDBMS engines on top ofthe distributed storage. Since we are not modifying the engine nor changing any100database fundamentals, data integrity is a given to the architecture.Data correctness implies that all operations should comply to semantics re-quired by an RDBMS. For example, each SQL transaction should be provided withthe isolation level it requested, and rolling back a transaction should not create atransient inconsistent state, no matter how brief. Data correctness is affected bythe distributed nature of the system. Since multiple operations may be simultane-ously operating on the shared data, it should not be corrupted due to race conditionsbetween (possibly) conflicting operations. iEngine uses one-round transactions toprovide data correctness and Innesto for range locking and isolation. The inter-face to the storage layer complies to what the RDBMS requires and ensures datacorrectness is never violated.6.4 Range LockingInnesto provides a data partition hierarchy abstraction. Parent partitions cover theirchildren. Each Innesto operation is tested against the hierarchy to find target par-titions during execution. Single item operations usually touch a single partition,where the search operation is an exception that touches multiple. We extend thisidea to provide the ability to perform range locking.Two operations are supported for range locking which operate on range keys.Inserting a range key locks the range specified by it, and removing the key unlocksthe range. A single item can be locked by inserting a range key which has its minand max set to the items value.Figure 6.5 illustrates an example of acquiring a range lock. While a singledimension is shown for simplicity, the same logic applies to higher dimension.Executing a lock operation starts from the root partition to find all target childpartitions that intersect the range key. For each target, the range key is fragmenteddown into a key confined by the bounds of the partition and is checked against theset existing lock fragments within in. A lock fragment has a reference count ofhow many owners currently hold it.As with conventional locking, read and write locks are supported. When thereis a conflict with a write lock fragment, or the new lock is a write lock with a con-flict with a read lock fragment, locking fails. Conflicts between read lock fragments101Figure 6.5: Range lock example. The numbers represent the reference countof each fragment. Note that the partitions and locks could have anynumber of dimensions.are handled by finding their intersection. If they align perfectly, the reference countof the existing lock is incremented. Else, three new lock fragments are created. Thefirst one has a reference count of one representing parts of the new lock not in theexisting range. The second is parts of the existing fragment not intersecting thenew lock with its reference count unchanged. Third is the intersection fragmentwith a reference count of the existing fragment plus one.6.4.1 Locks & DeadlocksThe commit protocol of one-round transactions used by Innesto uses short-livedlocks. If locking fails by any Participant, all other Participants release their locksand the transaction is retried after waiting an exponentially growing random time-out. This is to prevent deadlocks. However, performance degrades drasticallyunder contention.In the face of a locking conflict, bandwidth is wasted by sending the transactionback and forth on each try. The transaction sits idle for a timeout hoping to makeprogress on the next attempt. Unnecessary contention is created by Participantswhich have to lock and release items when locking fails on another Participant.102In the case of executing RDBMS transactions, the transaction can’t make progressuntil its locks are acquired. Consequently, the system is brought to a complete haltbeyond a certain point.To avoid this, we changed the locking mechanism of one-round transactionsto use blocking locks. As part of the commit protocol, if a transaction detects alocking conflict, it waits for them to be released. The Participant can still makeprogress on other transactions while one waits for the locks. Deadlocks are pre-vented by using a timer per transaction on each Participant. When the timer fires, itis a strong indication that there is a possibility of a deadlock. All locks are releasedupon a timeout as before with the addition that the SQL transaction is also rolledback.6.4.2 FairnessStarvation is a classical problem with locking systems. It is caused under con-tention where less restrictive locks constantly win over other stricter locks andstarve them out. For example, read locks on a single item are less restrictive thanwrite locks since they are compatible with each other and conflict with write locks,while write locks are always in conflict with both read and write locks. Withthe presence of enough read and write locks on the same item, if not dealt withcorrectly, read locks will constantly win over write locks indefinitely, preventingwrites from making any progress.With range locking, a new form of starvation occurs. As the range of a rangelock broadens, it becomes more restrictive. Locks with smaller ranges (and singleitem locks) have the potential to starve broader range locks. We circumvent this byusing an acyclic directed dependency graph for pending locks. The graph providesFIFO locking. Figure 6.6 depicts an example of range locking requests and thecorresponding lock dependency graph.Each node in the graph is a lock request pending on a conflict. A new lockrequest checks both the graph and existing locks for all possible conflicts and addsa dependency to them. A node with no dependencies is granted its lock. When alock is released, the node is removed from the graph an locking resumes. A nodekeeps track of all possible conflicts since intermediate nodes could be removed at103Figure 6.6: Red are existing range locks and gray are pending range lock re-quests (left) Order in which range lock requests arrive (right) Equivalentlock dependency graph.any time due to timeouts or events occurring on other Participants.FIFO locking add some extra waiting time to a range lock. In Figure 6.6 lock5 has no conflict with an existing lock, but only on the pending lock 3. Without thegraph, lock 5 would be granted its lock as soon as it arrived. However, this wouldstarve lock 3 which now has to also wait for lock to finish.6.4.3 Timeout TuningTuning the timeout value of deadlock detection is an important task. A value tooshort would result in a premature abort for most transactions. This wastes the timethey spent inside the dependency graph, and reduces the overall throughput of thesystem. Setting the timeout to a larger value would cause deadlocks to be detectedtoo late. Blocked transactions would spend a long time before finally being aborted,keeping others from making any progress.The workload of different partitions is different and varies over time. We com-pute the timeout value per Innesto partition by profiling eachone’s workload. Ahistogram of the time it takes to acquire the locks on a partition are computed dur-ing the profiling period. The timeout is set at a configurable specific percentile of104the histogram (e.g., 95%).Profiling data is migrated with partitions as they are split and migrated inInnesto. Lower percentiles will result in higher throughput in the presence of lowcontention. As contention increases, higher wait times become normal and most ofthe slow transaction will start aborting. Thus we allow the option to periodicallyprofile the workload to decide new timeout values.6.5 Anatomy of a TransactionA transaction has three significant boundaries: starts, commit/rollback, and com-pletion. Any CRUD operations performed between start to commit are performedatomically and isolated from other transactions. In the case of rollback, all effectsof the transaction are erased. The time between commit/abort and completion iswhen the storage layer is busy applying or removing the effects of the transaction.SQL CRUD commands are sent by the storage handlers to their respectivetransaction handler. The transaction handler transforms the CRUD requests ona table to individual requests on the index and locking Innesto instances associatedwith the table. Each operation’s key clearly defines parts of the data it would betouching so that the locking mechanism would work correctly.Regardless of the type of operation, locks are acquired immediately based onthe operation’s key. According to the modified locking protocol, locking mostlikely fails due to deadlocks, which will cause the transaction to rollback. Readoperations (Lookup) are executed once the locks have been acquired. Executingwrite operations (Create, Modify, and Delete) is postponed until commit time. Thiswill reduce the cost of rolling-back transaction halfway as only locks will havebeen acquired by the transaction. Once commit is received for the transaction,write operations are executed, after which locks are released.There are a few exceptions to the mechanism described above based on whatSQL expects when executing a transaction. For example, Create expects the itemto not exist. Once the lock on the item is acquired the transaction can be sure noother transaction could create the item until it is done. However, if the item alreadyexists, it will be too late to discover its existence at commit time. For this scenario,Create also performs a Lookup immediately after acquiring locks to ensure it is a105valid command.6.6 Fault ToleranceFailures are inevitable in distributed systems, in the form of hardware failures,network partitions or full scale datacenter outages. A failure could leave data inan inconsistent state due to partial execution of transactions. Thus we considerdifferent failure scenarios to ensure no kind of failures would allow inconsistencies.6.6.1 Server FailureServer failures are detected by other servers and machines trying to commit one-round transactions. One-round transactions use logging to protect the data fromfailures. All write items are logged synchronously as part of the commit protocol.Upon the failure of a server or group of servers, their log is used to reconstruct theirdata.Innesto uses one-round transactions. As a result, any Innesto operation willexecute to completion, even in the presence of failures. Only execution time ofoperations in flight during the failure will be affected, as some will have to concludeafter the failed servers have been restored.6.6.2 Engine FailureEngine failures are detected by their associated transaction handler instance. Whenan engine fails, transactions that may be committed are continued to completion.These will be transactions that the storage handler told its transaction handler tocommit and was waiting for a completion notification. All other uncompletedtransactions where commit hasn’t been received are rolled back by the transactionhandler.6.6.3 Transaction Handler FailureTransaction handlers are the only component in the storage layer that know aboutSQL transaction boundaries. While it is possible for storage handlers to cleanuppartially executed transactions if they detect their transaction handler has failed,106such an approach will not withstand simultaneous failures of storage handlers andtransaction handlers.There are two types of uncommitted transactions. Ones that have alreadystarted but commit hasn’t been issued yet. Such transactions have acquired locksand buffered their write operations. Another type of uncompleted transactions arethose that commit has been issued by the storage handler but the transaction han-dler failed whilst commit was in progress. Such transactions may have alreadyapplied some write operations but not all, and locks may not have cleanly beenreleased. To address recovering each type of transaction upon failures we use twoseparate logs: undo log and redo log. Both logs are stored inside servers usingone-round transactions, hence, protected from failures.An undo log includes a list of locks acquired by each transaction. Each lockoperation is in essence an Innesto insert, and each unlock a remove. Innesto allowscombining multiple operations using the same one-round transaction. We takeadvantage of this to write to the undo log each time a lock is acquired and released.The causes the undo log to be in perfect sync with the locking system even in theface of failures. To recover from a failure, the undo log has to be traversed to findlingering locks and remove them.When the transaction handler receives a commit requests it has to apply thewrite operations and release the locks. But before doing so, it makes a full copy ofthe write operations and writes them to the redo log using a one-round transaction.If a failure happens before the logging to the redo log completes, we treat thistransaction as if commit hasn’t been received. Otherwise, since the log will containa full copy of what needs to be done, the transaction can be completed by anothertransaction handler using the redo log before locks are released.6.7 EvaluationWe evaluate our prototype iEngine using a cluster of commodity machines. Eachmachine has two quad-core Xeon E5506 processors and 32GB of memory. Eval-uation is done using the TPC-C benchmark. TPC-C is a popular benchmark thatextensively simulates a real world transactionally consistent workload. Designedby the vendors that build share-everything architectures, TPC-C represents an ex-107tremely challenging workload on a distributed RDBMS.We compare the performance of iEngine against InnoDB, the default enginein MySQL. InnoDB is the only storage engine that ships with MySQL and sup-ports transactions with serializable isolation. InnoDB is widely used, even in cloudhosted database offerings [4, 19]. All experiments use MySQL 5.5.8 source dis-tribution and InnoDB 1.1.4. For a full list of InnoDB configuration parameterssee [125]. In summary, the buffer pool was configured to use 28G, log only flushedonce a second using fdatasync, double-write was disabled, and the log file sizewas set to 1900MB. When using iEngine, MySQL internal caches were disabled toavoid using stale data. TPC-C’s consistency checks were used to verify the resultsof TPC-C and the functionality of iEngine.To emulate a practical workload on the system we used a benchmark client [40]which generates workloads based on the TPC-C [42] specification. We first testiEngine’s scalability in Section 6.7.2, then compare how it’s isolation componentperforms compared to InnoDB in Section 6.7.3. A modified workload is used inSection 6.7.4 to test how iEngine performs under web compatible workloads. Inall our experiments in the first three sections crash consistency was disabled. Thuswe evaluate the perofrmance costs of crash consistency in Section TPC-CTPC-C simulates a complete environment where a population of terminal operatorsexecute transactions against a database. The benchmark is centered around theprincipal activities (transactions) of an order entry environment. These transactionsinclude entering and delivering orders, recording payments, checking the status oforders, and monitoring the level of stock at the warehouses [42]. Data is storedin numerous tables, the largest of which is the Stock table which contains 100,000items per warehouse. For example, a dataset with 140 warehouses has 14 millionitems stored in the Stock table.Each user requests a chain of transactions from the RDBMS. A total of fivetypes of transactions are used in TPC-C, with a mixture of read-only transactionsand write heavy ones. The most frequent transaction consists of entering a new or-der (i.e., the New Order transaction) which on average is comprised of ten different108Figure 6.7: Strong scaling of iEngine.items. Close to 10% of all orders must be supplied by another warehouse. This isto enforce distributed transactions when data is partitioned based on warehouses.TPC-C requires a certain mixture of its different transactions to complete withinspecified time bounds for the benchmark to succeed. A run violating the expecta-tions fails. The performance metric reported measures the number of New Ordersthat can be fully processed per minute, i.e., tpm-C.The main objective of our evaluation is to throttle the system as much as possi-ble and to see how it scales. All transactions were executed in serializable isolationlevel except the stock-level transaction which ran with read-committed isolation(and permitted by TPC-C specification). User wait times were set to zero to avoidartificial delays. Before each experiment, the database is loaded with data basedon the TPC-C specification.6.7.2 Strong ScalingIn a typical single-node RDBMS, the throughput of the system increases as thenumber of users making requests scales until the saturation point. Beyond the sat-uration point, throughput cannot increase as the system has reached its maximum109capacity. iEngine is designed to provide the ability to shift its saturation point tomore than what a single machine can handle.Strong scaling is defined as how throughput of a system varies with a fixedproblem size. We evaluate iEngine’s strong scaling using a dataset with 90 ware-houses using two sets of machines. One set, varying from one to three machines,only run back-end servers. The other set use the same number of machines as theback-end servers to run the MySQL engines, transaction handlers and the clientprograms emulating TPC-C user requests [40]. Experiments are run until TPC-Cstarts failing.Figure 6.7 shows as the number of users increases throughput rises until itsaturates. Beyond a certain threshold, TPC-C starts failing due to long wait timesfor transactions. By increasing the system size, the saturation threshold is movedfrom around 3K tpm-C supporting only 750 users to around 7.5K tpm-C with 2100users. This is evident that iEngine can scale beyond what a single machine canhandle with the presence of more processing power.6.7.3 ContentionInnoDB represents state-of-the-art RDBMS storage engine. It uses Multi-VersionConcurrency Control (MVCC) to isolate transactions in which data items aren’tlocked, but separate versions of the data are maintained for different transactions.iEngine uses logical distributed locking provided by Innesto. We wish to evaluatehow iEngine’s isolation performs compared to the highly optimized and centralizedisolation component of InnoDB. We do this by increasing user contention.User contention is created as the number of users operating on a fixed-sizedataset increases. This causes transaction conflicts and rollbacks. This is quitecommon in retailer store websites where customer interest in special offers in-creases. Two different dataset sizes are used, one with 144 warehouses and onewith 240. For each dataset and system configuration, we increase the number ofusers until TPC-C starts failing. For InnoDB we use a single machine setup whichruns MySQL server and the TPC-C client. For iEngine, we use a six machine setup,where similar to the InnoDB setting, each machine runs all required components.Figure 6.8 shows that with a small number of users, InnoDB outperforms110(a) Throughput.(b) Transactions per user.Figure 6.8: User contention.111Figure 6.9: Web compatible workload.iEngine. As contention increases, InnoDB’s performance drops drastically andTPC-C starts failing close to 850 users for 144 warehouses. Increasing the datasetsize only shifts the breaking point as it reduces contention. iEngine has a consistentperformance and can support over 2.5x users compared to InnoDB.TPC-C extensively stresses out different components of the system. We foundlocking to be the bottleneck as most transactions need to operate on the same table.iEngine’s performance depends on Innesto which depends on how fast one-roundtransactions can complete. Using commodity hardware, network RTTs are in theorder of a millisecond. Decreasing this down a few orders of magnitude could helpimprove performance.6.7.4 Web Compatible WorkloadTPC-C extensively uses range queries as part of the benchmark. Web workloadsusually consist of operations on individual items and not ranges. In order to bothemulate a web workload and to see how iEngine behaves in a write intensive work-load, we run TPC-C but only execute the New Order transaction which it writedominated. We use the same experimental setup as before where six machines are112Figure 6.10: Crash consistency.used for iEngine and one for InnoDB. The dataset size is 240 warehouses.As shown in Figure 6.9, as before InnoDB starts off well with low user con-tention but throughput falls steeply afterwards. On the other hand iEngine main-tains its performance level with contention and throughput gradually drops as thenumber of users increases. This pattern could be equally attributed to the con-currency control mechanism and the B-tree based indexing in InnoDB. This is anindication that indexing based on Innesto is a suitable alternative to conventionalB-tree based indexing.6.7.5 Crash Consistency CostCrash consistency is provided to ensure data is recoverable in a consistent mannerin the event of failures. To measure the cost of crash consistency, using a datasetsize of 60 warehouses, we use six machines to run iEngine. Two act as one-roundtransaction servers and four run the rest of the components. Logging to disk waschosen as the failure protection mechanism.Figure 6.10 shows adding crash consistency to iEngine incurs an overhead be-tween 40%-50%. There are two performance costs associated with adding crash113consistency to iEngine. First, each write item of a one-round transaction has tosynchronously be logged. Second, in a higher level, each RDBMS transaction iniEngine has to use the redo and undo logs during execution and commit. Thesetwo factor contribute to an overall lower performance when crash consistency isenabled.Logging to disk adds time to the completion time of one-round transactions.This causes locks to be held for longer periods of time, magnifying the effect ofcongestion. As a result, more deadlocks occur and TPC-C starts failing earlier. Theperformance of iEngine is sensitive to the RTT between the machines. With ourcommodity setup this was in the order of a millisecond. Decreasing this base RTTwill significantly improve the performance of iEngine.6.8 Related Work6.8.1 Data PartitioningPartitioning has been the standard mechanism used in scaling out RDBMSs be-yond single machines. Most scalable RDBMSs currently support partitioning oneway or the other. Schism [73] uses workload aware partitioning to minimize costlymulti partition distributed transactions. The workload of the system is periodicallymonitored to identify the set of tuples accessed together. Schism uses graph parti-tioning to find balanced partitions which will reduce the number of cut edges, i.e.,distributed transactions. Relational Cloud [74] uses a similar partitioning schemeto decide on the correct data placement across autonomous RDBMS engines usedas the back-end to DBaaS. The efficiency of data placement thoroughly dependson how easy it would be to partition the workload in order to reduce distributedtransactions. The partitioning gives the notion of a shared-nothing architecture,giving each RDBMS engine full autonomy. H-Store [90] (commercially known asVoltDB [43]) is a memory-resident database that partitions data across many sin-gle threaded engines. The unit of transaction in VoltDB is stored procedures, eachexecuted to completion in a single thread.While optimizing the partitioning scheme helps reduce distributed transactions,it cannot eliminate them for generic workloads. In the case of VoltDB, a-priori in-114formation about the workload is required which is not always available. Optimiz-ing for minimizing distributed transactions has resulted in redesigning the RDBMSengine, a process in which full SQL support has been lost. iEngine uses Innestowhich internally performs automatic partitioning. However, iEngine acts as a dis-tributed storage layer and allows unmodified RDBMS engines to interact with thedata keeping SQL features intact.MySQL Cluster [29] performs automatic partitioning and scales horizontally.It allows multiple instances of MySQL demons to run and access the data via multi-ple different APIs. Data is replicated across data nodes to geo replication. MySQLCluster’s storage engine, NDB, only support the read committed isolation level.iEngine on the other hand supports generic locking and additionally supports seri-alizable isolation.6.8.2 Distributed IndexingIn scaling out RDBMS architectures indexing plays a key role. Some options forindexing are hash based, B-tree based, and spatial based indexing. While hashbased indexing provides the fastest mechanism to access the data it can not effi-ciently support range queries and is useful for only a few use cases.B-tree based approaches are the most commonly used method and supportboth item and single dimension range queries. Most notably is the distributedB-tree [48] which uses one-round transactions provided by Sinfonia [49]. It scaleswell beyond a state-of-the-art single node B-tree (i.e., BerekleyDB). However, itworks well for read heavy workloads and suffers under write contention. Min-uet [111] is an improved version of the B-tree which supports OLAP and OLTPbased queries, but only supports simple transactions. iEngine uses Innesto for in-dexing which scales for both read and write heavy workloads while also supportingmulti-attribute range queries.Spatial based approaches natively support multi-attribute range queries, includ-ing Innesto. BATON [89] provides an overlay over a cloud storage. It supports ef-ficient range queries [122] but has a scalability bottleneck since all transactions areserialized through a central component. Innesto is fully distributed and providesthe right balance between scalability, strong consistency and multi-attribute range115queries. For more related work in this area see Chapter Distributed LockingSpecialized locking systems, such as Chubby [61] and ZooKeeper [88], providescalable locking along with strong consistency guarantees. Due to the way lockingis implemented in these systems, efficiently implementing complex multi-attributerange locking (which is a common requirement of an RDBMS) is fundamentallynot feasible. Googles Percolator [107] also implements a scalable locking mecha-nism on top of BigTable, but only works on snapshot isolation and can not supportserialization isolation, as required by strict consistency. Thus these specialized sys-tems lack the generality and expressibility needed by a RDBMS. iEngine’s lockingcomponent uses generic distributed range locking on multiple attributes with seri-alizable isolation. It supports atomically logging to the undo log for fault toleranceby using one-round transactions.6.8.4 NoSQLWith ever increasing user demands and data sizes, service providers turned toNoSQL to provide desired scalability requirements through narrow APIs and weakconsistency guarantees with low latency. NoSQL systems are designed in a wayto scale horizontally without any bottlenecks compared to conventional RDBMSs.Systems such as Cassandra [94], BigTable [65], and Dynamo [76] usually use hashbased data placement to load balance and spread the data across many nodes.High scalability comes at the price of sacrificing consistency and decent querycapabilities. There are different flavors in the supported query models of NoSQLsystems, such as column-oriented, document based, key/value, graph based, etc.While some systems support a variant of range queries, they are far from what SQLprovides. Additionally, isolating operations and grouping them into transactions isvery hard to achieve. This has led to two different worlds of SQL and NoSQLsystems, each with its own application domain in which it outperforms the other[81].1166.8.5 Scaling StorageWith scalability and performance of NoSQL on one end and feature rich SQLqueries and transactions on the other, attempts to find middle grounds betweenthe two have formed in the shape of building SQL over NoSQL. The RDBMS isre-architected by de-coupling the storage layer from the query processing engine.Megastore [51] partitions data into entity groups where consistency is guaran-teed within each partition but has relaxed consistency across them. ACID trans-actions are supported within an entity group and are heavily used in Google [20].ElasTraS [75] uses a two level hierarchy of Transaction Managers to route trans-actions to the right partitions and Distributed Storage to execute them. ACIDtransactions are only provided within a single partition and only simplified mini-transactions are supported across them. Barntner et. al. [60] designed protocolsrequired for building database applications over Amazon’s S3 [5], a scalable ob-ject datastore, but provide no equivalent ACID transactions. While iEngine hasa similar architecture it does not limit consistency to a defined set of boundaries.ACID is provided irrespective of the data being touched in a transaction.Microsoft SQL Azure [63] supports ACID transactions over multiple recordsbut the database size is constrained to fit on a single node. For larger data sets, theapplication needs to partition the data among different database instances.Dueteronomy [97, 98] has an architecture similar to iEngine where data han-dling and transaction handling are separated from each other. The lock managerprovides three different lock granularities: items, partitions and tables. Hence,range locks are not as efficient as range locking in iEngine.Another dimension of scaling the storage layer has emerged in the form ofusing MapReduce to implement SQL. Tenzing [66] is used internally by Googleto serve queries over a Peta-byte of data but implements most of SQL, not all ofit. InfiniDB [23] uses a MapReduce-like approach to operate on data through aMySQL interface. Queries are parallelized to avoid thread-to-thread or node-to-node communications. Such systems work well to run data analytics on large databut suffer if the database is primarily used to operate on individual items.1176.8.6 NewSQLNewSQL is a new class of RDBMSs that seek to bridge the gap between NoSQLsystems and traditional databases. New engines such as Spanner [71], VoltDB [43]and Clustrix [13] are completely redesigned database platforms. They are designedto operate in a distributed cluster of shared-nothing nodes, in which each node ownsa subset of the data [30]. NewSQL mostly target application that have transactionsthat are short-lived with no user stalls, touch a small subset of data with no full tablescans or large distributed joins, and are repetitive by executing the same querieswith different inputs [30]. They lack workload generality.Other NewSQL systems aim to provide optimized storage engines for SQL.TokuDB [41] replaces B-tree indexing with fractal tree indexing for MySQL.While fractal trees have asymptotically faster insertions and deletions comparedto B-trees, TokuDB still focuses on single-server functionality. MonetDB [26] alsouses a MySQL interface and replaces row based data storage with column orientedstorage and indexing. Isolation in MonetDB is done using optimistic concurrencycontrol which is known to quickly start failing under contention.6.8.7 Scalable Transactional ModelsOur focus when building iEngine was to provide the foremost distributed equiv-alent to the main duties performed by the storage layer of a traditional RDBMS,i.e., indexing and locking, without having to modify the engine. In an orthogo-nal direction, an interesting body of work has recently been done on finding waysto implementing transactions in a more scalable manner. DORA [106] exploreshow to implement a thread-to-data model rather than a thread-to-transaction modelto reduce the contention. Consistency Rationing [92] explores how to adaptivelychange the required consistency levels when it matters to get better performanceand cost effectiveness in the cloud. Larson et. al. [95] provide concurrency controlmechanisms that allow transactions to never block during normal processing. Theymight have to wait before commit to ensure correct serialization.The latter seems an interesting approach for iEngine. Isolation is currently be-ing done using the locking component. It could be modified to implement differentconcurrency control mechanisms which would better suit the target workload. We118leave this avenue for future work.6.9 Conclusion & Future WorkWe present iEngine, a scalable RDBMS which supports full SQL. Instead of re-architecting the RDBMS to scale, iEngine allows numerous unmodified SQL en-gines to be plugged into a scalable distributed storage layer. iEngines scales intwo independent dimensions of adding more query processing units for OLAP, andscaling the storage layer for higher throughput and capacity. SQL engines bind tothe distributed storage layer through a standard storage interface. The front end ofthe storage layer provides a translation of SQL commands to storage functions andensures conflicting transactions are globally isolated from each other.Indexing and locking are done using Innesto which is built over one-roundtransactions. A modified Innesto is used for distributed range locking. One-roundtransactions provide consistency and fault-tolerance to protect iEngine from allpossible forms of failures. Our evaluation of iEngine show it scales beyond thecapabilities of InnoDB, one of the few full SQL compliant systems, to provideserializable isolation using the TPC-C benchmark.Our goal in building iEngine was to provide a fully distributed and scalableRDBMS without having to reinvent the SQL engine and well understood queryprocessing algorithms. Our main contributions were to find distributed equiva-lences for indexing and locking. Future work on iEngine would be to providemultiple different isolation mechanisms. In the general case where the workloadis unknown pessimistic locking is a suitable approach. However, there is much togain when optimism is an option. Providing the ability to select between differentisolation methods at a database level is a task left for future work.119Chapter 7Future Work: ArgalUsing one-round transactions to build distributed systems simplifies reasoningabout concurrency and correctness. The application developers only need to en-sure any assumptions that they made when performing a computation are verified atcommit time. In the case of concurrent modifications, all but one conflicting trans-actions are rejected. The rest have to recompute their operations with refresheddata.While this provides strong consistency, as with any other system where a lim-ited resource is shared with high demand, fairness may become an issue. In asystem with multiple threads of execution, fairness requires all threads to makesome progress over time. In a perfectly fair system, multiple identical threads con-tinuously executing transactions should on average complete the same number oftransactions in any given time period.Fairness usually doesn’t play a significant role in most systems. Retrying trans-actions after aborts usually takes a very short time not observable in a macro scale.Even if the delays are visible, users are accustomed to restarting their current task(e.g., refreshing the webpage) if they feel their session is too slow. However, forour target application of fast paced FPS games, fairness will start affecting enjoya-bility of the game.In Chapter 5 we presented an architecture used for epic-scale FPS games. Play-ers connect to SPEX over the Internet and are provided with regular updates. Whena player makes an action, SPEX internally uses one-round transactions to execute120them. Since players observe the VE over WAN latencies, what they observe (andact upon) is lagging behind the latest state. So even though a player acts based onwhat it sees, its actions may be rejected due to network latencies. What aggravatesthe situation is that if two players observe the same state and make the same action(e.g., shoot at the same target or try pick up the same item), the player with thelower latency to SPEX will constantly win.We provide a solution for providing fairness using one-round transactions. Twokey features of FPS games make our solution possible. First, although the exactactions of each player per frame are not predictable, their maximum range of effectcan be estimated. For example, it’s unknown which other player a player with aspecific weapon in some point of the VE will shoot at, but it can only be someonewithin its weapon’s range. With proper interest management, the effects of a playerare limited to their interest set. Second, the state of events may be classified intotwo separate components. State that is used to decide what actions to perform, andstate that will be affected by the execution of an action. In FPS games the positionof other players is an example of state that directly influences decisions, and healthis what is influenced by the execution of action like shooting.These two features provide the basis for reservations used for fairness. Atany given time, the actions to be taken by a player are unknown until the statereaches the player’s machine and the player’s respective action comes back at alater time. A reservation reserves a spot in the state of the game for all possiblefuture modifications made by each player. As time progresses and the actual actionarrives, the reservation is modified into a one-round transaction and applied. Figure7.1 provides the general architecture of using reservations in Argal.Parts of the architecture overlap with that of SPEX and may be reused. Ex-clusive state of a player consists of state that is only modified by that player. Itrepresents the state that drives action decisions made by other players such as posi-tion and orientation. This information should be propagated as fast as possible. Assoon as it arrives, the interest manager and distribution overlay disseminate it assoon as possible. These are the tasks of SPEX described earlier. Shared state is thestate of the players that is affected as a consequence of executing actions. Healthand score are examples of shared state which may be modified by multiple sources.Modifications to this state are propagated with delay as the reservations are trans-121Figure 7.1: Architecture of Argal.formed into one-round transactions and disseminated with the same distributionoverlay.Reservations are implemented by using clock mode 2 (Figure 3.1c) of the com-mit protocol of one-round transaction where the final timestamp is decided by theCoordinator rather than by the Participants themselves. A reservation is made byonly executing the virtual time voting phase with dummy items. Once the writeitems are known the dummies are replaced with actual items. The Participantsknow to postpone executing transactions until their items have been received.Global ordering of events has been investigated before. Corfu [52] uses a cen-tral sequencer to issue tokens to concurrent threads. Data is appended to the endof a shared log in ascending order of tokens. Tokens are similar to reservations.A token can be considered as a reservation that reserves all items in the system.However, compared to reservations which use a distributed voting protocol, tokensare issued using a centralized component. A token based system suffers from falsesharing between concurrent threads since potentially irrelevant operations have to122wait for prior tokens to seal before they can proceed. Slower threads will hinder allother threads in the system even in cases where they don’t share any state.The granularity of reservations is individual items and only conflicting threadsare synchronized with each other. The application using reservations takes twosteps to complete each operation. In the first step the one-round transaction ispopulated with all items the transaction could possibly touch. At some point in thefuture where enough information is obtained (e.g. the user action has arrived or atimeout occurs), the actual items are filled with the required operation and the restare set to no-ops.123Chapter 8ConclusionIn this thesis we opted for consistency built over a distributed one-round transactionframework. One-round transactions provide competitive performance comparedto normal message passing transports, while simplifying programming distributedapplications and reasoning about concurrency. They also provide protection of dataagainst many forms of failures, such as software crashes, hardware failures and fulldatacenter outages. This allows factoring out redundant fault-recovery mechanismsfrom upper layers in the application stack. All these contribute to simplifying theend application.We use one-round transactions to tackle building consistent versions of someknown difficult distributed applications. Innesto, our key/value datastore, providesmulti-attribute range search with comparable performance to that of a state-of-the-art eventually consistent datastore. SPEX presents a new architecture for epic-scaleFPS gaming which circumvents traditional limitations in large scale commercialdeployments without sacrificing consistency. iEngine takes a direct approach todistributing an RDBMS by pushing isolation semantics down to a distributed stor-age layer obviating the need to modify the query processing engine. Thus it fullysupports SQL while providing scalability.One-round transactions can at any time be transformed into normal messagepassing. In contrast to the traditional approach of trying to provide consistencywhere absolutely required at some point in the software stack, our experience withone-round transactions has taught us that it’s much easier to start with consistency124and move towards relaxing it. The steps needed to port to this framework are tofirst ensure all shared data is explicitly identified and any computations affectingor affected by them convey this information to their one-round transactions. Ad-ditionally, an extra network handler has to be provided to the framework whichensures proper isolation.Our applications themselves serve as building blocks for other distributed sys-tems and services. iEngine is general purpose RDBMS. A significant portion ofonline services still use SQL databases for data storage and could benefit from ascalable backend. While we presented SPEX in an FPS gaming scenario, it couldbe used for any system with a large number of event sources and consumers.125Bibliography[1] Lastvisited 2015-01-05. → pages 85[2] Acid. Last visited 2015-01-05. → pages12[3] Amazon ec2. Last visited 2015-01-05. → pages 2,71[4] Amazon rds. Last visited 2015-01-05. → pages13, 90, 97, 108[5] Amazon s3. Last visited 2015-01-05. → pages 117[6] Amazon simpledb. Last visited 2015-01-05. → pages 11, 30, 42, 53[7] Amazon simpledb consistency enhancements. Last visited2015-01-05. → pages 12[8] Apache hbase. Last visited 2015-01-05. → pages51[9] Cap twelve years later: How the ”rules” have changed. Last visited 2015-01-05. → pages 5[10] Cassandra query language (cql). visited 2015-01-05. → pages 30[11] Causal consistency. consistency. Lastvisited 2015-01-05. → pages 11126[12] Clash of clans. Last visited 2015-01-05. →pages 1[13] Clustrix. Last visited 2015-01-05. → pages 7, 118[14] Database index. (database). Last visited2015-01-05. → pages 93[15] Dota 2. Last visited 2015-01-05. → pages 1[16] Dota 2 $10 million tournament. Lastvisited 2015-01-05. → pages 1[17] Eve online. Last visited 2015-01-05. → pages16, 64, 85[18] Farmville. Last visited 2015-01-05. → pages 1[19] Google cloud sql. Last visited2015-01-05. → pages 90, 97, 108[20] Google megastore - 3 billion writes and 20 billion read transac-tions daily. Last visited 2015-01-05. →pages 52, 117[21] Gql. visited 2015-01-05. → pages 30[22] Hypertable. Last visited 2015-01-05.→ pages 52[23] Infinidb. Last visited 2015-01-05. → pages 117[24] Kd-tree. tree. Last visited 2015-01-05. →pages 31[25] Lamp. (so f twarebundle). Last visited2015-01-05. → pages 13[26] Monetdb. Last visited 2015-01-05. → pages 118[27] Muchdifferent’s pikkotekk. Last visited 2015-01-05. → pages 64,85127[28] Mysql. Last visited 2015-01-05. → pages 5, 97[29] Mysql cluster. Last visited 2015-01-05. → pages 7, 90, 115[30] Newsql. Last visited 2015-01-05. →pages 118[31] Partitioning. partitioning. Last visited2015-01-05. → pages 31[32] Real-time messaging protocol. visited 2015-01-05. → pages 4[33] Second life. Last visited 2015-01-05. → pages3, 16, 56, 61, 69, 71, 85[34] Second life server architecture. Last visited 2015-01-05. → pages 56, 72,85[35] Sequential consistency. consistency.Last visited 2015-01-05. → pages 11[36] Serializability. Last visited2015-01-05. → pages 11[37] Simple object access protocol. Last vis-ited 2015-01-05. → pages 3[38] Sorting in key-value data model. Last visited 2015-01-05. → pages 30[39] The bloodbath of b-r5rb, gamings most destructive battle ever. visited 2015-01-05. → pages 16[40] Tpcc mysql.∼percona-dev/perconatools/tpcc-mysql. Last visited 2015-01-05. → pages 108, 110[41] Tpkudb. Last visited2015-01-05. → pages 118[42] Transaction processing performance council. visited 2015-01-05. → pages 108128[43] Voltdb. Last visited 2015-01-05. → pages 7, 114,118[44] Warp. Last visited 2015-01-05. →pages 52[45] What’s better for your big data application, sql or nosql?–sql-or-nosql-.html. Last visited 2015-01-05.→ pages 16, 89[46] World of warcraft. Last visited 2015-01-05. →pages 16, 56, 85[47] AGARWAL, A., SLEE, M., AND KWIATKOWSKI, M. Thrift: Scalablecross-language services implementation. Tech. rep., Facebook, 4 2007. →pages 4[48] AGUILERA, M. K., GOLAB, W., AND SHAH, M. A. A practical scalabledistributed b-tree. Proc. VLDB Endow. 1, 1 (Aug. 2008), 598–609. → pages53, 54, 115[49] AGUILERA, M. K., MERCHANT, A., SHAH, M., VEITCH, A., ANDKARAMANOLIS, C. Sinfonia: A new paradigm for building scalable dis-tributed systems. In Proceedings of Twenty-first ACM SIGOPS Symposiumon Operating Systems Principles (New York, NY, USA, 2007), SOSP ’07,ACM, pp. 159–174. → pages 4, 6, 18, 23, 53, 54, 115[50] ATIKOGLU, B., XU, Y., FRACHTENBERG, E., JIANG, S., ANDPALECZNY, M. Workload analysis of a large-scale key-value store. SIG-METRICS ’12, ACM, pp. 53–64. → pages 15, 39[51] BAKER, J., BOND, C., CORBETT, J. C., FURMAN, J., KHORLIN, A.,LARSON, J., LEON, J.-M., LI, Y., LLOYD, A., AND YUSHPRAKH, V.Megastore: Providing scalable, highly available storage for interactive ser-vices. In Proceedings of the Conference on Innovative Data system Research(CIDR) (2011), pp. 223–234. → pages 7, 52, 90, 117[52] BALAKRISHNAN, M., MALKHI, D., PRABHAKARAN, V., WOBBER, T.,WEI, M., AND DAVIS, J. D. Corfu: A shared log design for flash clusters.In Proceedings of the 9th USENIX Conference on Networked Systems De-sign and Implementation (Berkeley, CA, USA, 2012), NSDI’12, USENIXAssociation, pp. 1–1. → pages 122129[53] BEIGBEDER, T., COUGHLAN, R., LUSHER, C., PLUNKETT, J., AGU, E.,AND CLAYPOOL, M. The effects of loss and latency on user performancein unreal tournament 2003. In NetGames ’04 (2004), pp. 144–151. → pages78[54] BELLMAN, R. E. Dynamic Programming. Princeton University Press,1957. → pages 40[55] BHARAMBE, A., DOUCEUR, J. R., LORCH, J. R., MOSCIBRODA, T.,PANG, J., SESHAN, S., AND ZHUANG, X. Donnybrook: enabling large-scale, high-speed, peer-to-peer games. In SIGCOMM ’08 (2008), pp. 389–400. → pages 15, 66, 70, 78, 85[56] BHARAMBE, A., PANG, J., AND SESHAN, S. Colyseus: a distributed ar-chitecture for online multiplayer games. In NSDI’06 (2006), pp. 12–12. →pages 69, 85[57] BHARAMBE, A. R., AGRAWAL, M., AND SESHAN, S. Mercury: support-ing scalable multi-attribute range queries. SIGCOMM Comput. Commun.Rev. 34, 4 (Aug. 2004), 353–366. → pages 53[58] BIRMAN, K. P. Replication and fault-tolerance in the isis system. In Pro-ceedings of the Tenth ACM Symposium on Operating Systems Principles(New York, NY, USA, 1985), SOSP ’85, ACM, pp. 79–86. → pages 4[59] BOUKERCHE, A., ROY, A., AND THOMAS, N. Dynamic grid-based mul-ticast group assignment in data distribution management. In in ProceedingsFourth IEEE International Workshop on Distributed Simulation and Real-Time Applications (DS-RT’00) (2000), p. 47. → pages 87[60] BRANTNER, M., FLORESCU, D., GRAF, D., KOSSMANN, D., ANDKRASKA, T. Building a database on s3. In Proceedings of the 2008 ACMSIGMOD International Conference on Management of Data (New York,NY, USA, 2008), SIGMOD ’08, ACM, pp. 251–264. → pages 117[61] BURROWS, M. The chubby lock service for loosely-coupled distributedsystems. In Proc. of the 7th symposium on Operating systems design andimplementation (2006), USENIX, pp. 335–350. → pages 116[62] CAI, M., FRANK, M., CHEN, J., AND SZEKELY, P. Maan: A multi-attribute addressable network for grid information services. GRID ’03, IEEEComputer Society, pp. 184–. → pages 53130[63] CAMPBELL, D. G., KAKIVAYA, G., AND ELLIS, N. Extreme scale withfull sql language support in microsoft sql azure. In Proceedings of the 2010ACM SIGMOD International Conference on Management of Data (NewYork, NY, USA, 2010), SIGMOD ’10, ACM, pp. 1021–1024. → pages117[64] CASTRO, M., DRUSCHEL, P., KERMARREC, A.-M., AND ROWSTRON,A. Scribe: A large-scale and decentralized application-level multicast in-frastructure. IEEE Journal on Selected Areas in Communications 20 (2002).→ pages 57, 66, 69[65] CHANG, F., DEAN, J., GHEMAWAT, S., HSIEH, W. C., WALLACH, D. A.,BURROWS, M., CHANDRA, T., FIKES, A., AND GRUBER, R. E. Bigtable:A distributed storage system for structured data. In Proceedings of the 7thUSENIX Symposium on Operating Systems Design and Implementation -Volume 7 (Berkeley, CA, USA, 2006), OSDI ’06, USENIX Association,pp. 15–15. → pages 29, 51, 90, 116[66] CHATTOPADHYAY, B., LIN, L., LIU, W., MITTAL, S., ARAGONDA, P.,LYCHAGINA, V., KWON, Y., AND WONG, M. Tenzing a sql imple-mentation on the mapreduce framework. In Proceedings of VLDB (2011),pp. 1318–1327. → pages 117[67] CLAYPOOL, M., AND CLAYPOOL, K. Latency and player actions in onlinegames. Commun. ACM 49, 11 (2006), 40–45. → pages 2, 15, 57, 72[68] CLEMENTS, A. T., PORTS, D. R. K., AND KARGER, D. R. Arpeggio:metadata searching and content sharing with chord. IPTPS’05, Springer-Verlag, pp. 58–68. → pages 53[69] COOPER, B. F., RAMAKRISHNAN, R., SRIVASTAVA, U., SILBERSTEIN,A., BOHANNON, P., JACOBSEN, H.-A., PUZ, N., WEAVER, D., ANDYERNENI, R. Pnuts: Yahoo!’s hosted data serving platform. Proc. VLDBEndow. 1, 2 (Aug. 2008), 1277–1288. → pages 30, 52[70] COOPER, B. F., SILBERSTEIN, A., TAM, E., RAMAKRISHNAN, R., ANDSEARS, R. Benchmarking cloud serving systems with ycsb. In Proceedingsof the 1st ACM Symposium on Cloud Computing (New York, NY, USA,2010), SoCC ’10, ACM, pp. 143–154. → pages 43[71] CORBETT, J. C., DEAN, J., EPSTEIN, M., FIKES, A., FROST, C., FUR-MAN, J. J., GHEMAWAT, S., GUBAREV, A., HEISER, C., HOCHSCHILD,131P., HSIEH, W., KANTHAK, S., KOGAN, E., LI, H., LLOYD, A., MEL-NIK, S., MWAURA, D., NAGLE, D., QUINLAN, S., RAO, R., ROLIG, L.,SAITO, Y., SZYMANIAK, M., TAYLOR, C., WANG, R., AND WOODFORD,D. Spanner: Google’s globally-distributed database. In Proceedings of the10th USENIX Conference on Operating Systems Design and Implementation(Berkeley, CA, USA, 2012), OSDI’12, USENIX Association, pp. 251–264.→ pages 30, 53, 118[72] COWLING, J., AND LISKOV, B. Granola: Low-overhead distributed trans-action coordination. In Proceedings of the 2012 USENIX Conference on An-nual Technical Conference (Berkeley, CA, USA, 2012), USENIX ATC’12,USENIX Association, pp. 21–21. → pages 6, 18, 26, 31, 54[73] CURINO, C., JONES, E., ZHANG, Y., AND MADDEN, S. Schism: aworkload-driven approach to database replication and partitioning. Proceed-ings of the VLDB Endowment 3, 1-2 (2010), 48–57. → pages 90, 114[74] CURINO, C., JONES, E. P. C., POPA, R., MALVIYA, N., WU, E., MAD-DEN, S., BALAKRISHNAN, H., AND ZELDOVICH, N. Relational cloud: adatabase service for the cloud. In CIDR 2011, Fifth Biennial Conference onInnovative Data Systems Research, Asilomar, CA, USA, Online Proceedings(2011). → pages 90, 114[75] DAS, S., AGRAWAL, D., AND EL ABBADI, A. Elastras: An elastic trans-actional data store in the cloud. In Proceedings of the 2009 Conference onHot Topics in Cloud Computing (Berkeley, CA, USA, 2009), HotCloud’09,USENIX Association. → pages 117[76] DECANDIA, G., HASTORUN, D., JAMPANI, M., KAKULAPATI, G., LAK-SHMAN, A., PILCHIN, A., SIVASUBRAMANIAN, S., VOSSHALL, P., ANDVOGELS, W. Dynamo: Amazon’s highly available key-value store. SIGOPSOper. Syst. Rev. 41, 6 (Oct. 2007), 205–220. → pages 29, 51, 90, 116[77] DOUGLAS, S., TANIN, E., HARWOOD, A., AND KARUNASEKERA, S. En-abling massively multi-player online gaming applications on a p2p architec-ture. In Proc. Information and Automation (2005), IEEE, pp. 7–12. → pages86[78] ESCRIVA, R., WONG, B., AND SIRER, E. G. Hyperdex: a distributed,searchable key-value store. SIGCOMM ’12, ACM, pp. 25–36. → pages 31,32, 39, 43, 52132[79] FIELDING, R. T. Architectural Styles and the Design of Network-basedSoftware Architectures. PhD thesis, 2000. AAI9980887. → pages 3[80] FINKEL, R. A., AND BENTLEY, J. L. Quad trees: A data structure forretrieval on composite keys. Acta Inf. 4 (1974), 1–9. → pages 61[81] FLORATOU, A., TELETIA, N., DEWITT, D. J., PATEL, J. M., ANDZHANG, D. Can the elephants handle the nosql onslaught? Proc. VLDBEndow. 5, 12 (Aug. 2012), 1712–1723. → pages 116[82] GANESAN, P., YANG, B., AND GARCIA-MOLINA, H. One torus to rulethem all: multi-dimensional queries in p2p systems. WebDB ’04, ACM,pp. 19–24. → pages 53[83] GAUTHIERDICKEY, C., LO, V., AND ZAPPALA, D. Using n-trees forscalable event ordering in peer-to-peer games. In Proc. NOSSDAV (2005),ACM. → pages 69, 72, 86[84] GILBERT, S., AND LYNCH, N. Brewer’s conjecture and the feasibility ofconsistent, available, partition-tolerant web services. SIGACT News 33, 2(June 2002), 51–59. → pages 5, 28[85] HU, S.-Y., CHANG, S.-C., AND JIANG, J.-R. Voronoi state managementfor peer-to-peer massively multiplayer online games. In Proc. IEEE CCNC(2008), IEEE, pp. 1134–1138. → pages 61[86] HU, S.-Y., AND CHEN, K.-T. Vso: Self-organizing spatial publish sub-scribe. In Proceedings of IEEE SASO 2011 (Oct 2011). → pages 7, 57, 58,70, 72, 78, 81, 86[87] HU, S.-Y., WU, C., BUYUKKAYA, E., CHIEN, C.-H., LIN, T.-H., AB-DALLAH, M., JIANG, J.-R., AND CHEN, K.-T. A spatial publish subscribeoverlay for massively multiuser virtual environments. In ICEIE 2010 (2010).→ pages 72, 86[88] HUNT, P., KONAR, M., JUNQUEIRA, F., AND REED, B. Zookeeper: wait-free coordination for internet-scale systems. In Proc. of the 2010 USENIXannual technical conference, USENIX, pp. 11–11. → pages 116[89] JAGADISH, H. V., OOI, B. C., AND VU, Q. H. Baton: A balanced treestructure for peer-to-peer networks. In Proceedings of the 31st InternationalConference on Very Large Data Bases (2005), VLDB ’05, VLDB Endow-ment, pp. 661–672. → pages 115133[90] KALLMAN, R., KIMURA, H., NATKINS, J., PAVLO, A., RASIN, A.,ZDONIK, S., JONES, E. P. C., MADDEN, S., STONEBRAKER, M.,ZHANG, Y., HUGG, J., AND ABADI, D. J. H-store: a high-performance,distributed main memory transaction processing system. Proc. VLDB En-dow. 1 (August 2008), 1496–1499. → pages 90, 114[91] KNUTSSON, B., HONGHUI, L., WEI, X., AND HOPKINS, B. Peer-to-peersupport for massively multiplayer games. In INFOCOM 2004 (mar. 2004),vol. 1, IEEE Press, pp. 96–107. → pages 61, 66, 70, 86[92] KRASKA, T., HENTSCHEL, M., ALONSO, G., AND KOSSMANN, D. Con-sistency rationing in the cloud: Pay only when it matters. Proc. VLDB En-dow. 2, 1 (Aug. 2009), 253–264. → pages 118[93] KRISHNA BALAN, R., EBLING, M., CASTRO, P., AND MISRA, A. Ma-trix: Adaptive middleware for distributed multiplayer games. In Proceed-ings of the ACM/IFIP/USENIX 2005 International Conference on Middle-ware (New York, NY, USA, 2005), Middleware ’05, Springer-Verlag NewYork, Inc., pp. 390–400. → pages 86[94] LAKSHMAN, A., AND MALIK, P. Cassandra: A decentralized structuredstorage system. SIGOPS Oper. Syst. Rev. 44, 2 (Apr. 2010), 35–40. → pages29, 32, 43, 51, 52, 116[95] LARSON, P.-A., BLANAS, S., DIACONU, C., FREEDMAN, C., PATEL,J. M., AND ZWILLING, M. High-performance concurrency control mech-anisms for main-memory databases. Proc. VLDB Endow. 5, 4 (Dec. 2011),298–309. → pages 118[96] LEE, Y.-T., AND CHEN, K.-T. Is server consolidation beneficial tommorpg? a case study of world of warcraft. In Proc. 2010 IEEE 3rd In-ternational Conference on Cloud Computing (2010), ACM, pp. 435–442.→ pages 57, 65[97] LEVANDOSKI, J. J., LOMET, D., MOKBEL, M. F., AND ZHAO, K. K.Deuteronomy: Transaction support for cloud data. In Conference on Inno-vative Data Systems Research (CIDR) (January 2011), →pages 117[98] LOMET, D. B., FEKETE, A., WEIKUM, G., AND ZWILLING, M. J. Un-bundling transaction services in the cloud. In CIDR (January 2009). →pages 117134[99] MILLER, J. L., AND CROWCROFT, J. Avatar movement in world of war-craft battlegrounds. In Proceedings of the 8th Annual Workshop on Networkand Systems Support for Games (Piscataway, NJ, USA, 2009), NetGames’09, IEEE Press, pp. 1:1–1:6. → pages 16, 56, 65, 72, 85[100] MINHAS, U. F., LIU, R., ABOULNAGA, A., SALEM, K., NG, J., ANDROBERTSON, S. Elastic scale-out for partition-based database systems. InProceedings of the 2012 IEEE 28th International Conference on Data En-gineering Workshops (Washington, DC, USA, 2012), ICDEW ’12, IEEEComputer Society, pp. 281–288. → pages 90[101] MORSE, K. L., BIC, U., AND DILLENCOURT, M. D. Interest managementin large-scale virtual environments. Presence: Teleoperators and VirtualEnvironments 9, 1 (2000). → pages 57, 58[102] MORSE, K. L., AND STEINMAN, J. S. Data distribution management inthe hla: Multidimensional regions and physically correct filtering. In In Pro-ceedings of the 1997 Spring Simulation Interoperability Workshop (1997),Spring, pp. 343–352. → pages 87[103] NAJARAN, M. T., HU, S.-Y., AND HUTCHINSON, N. C. Spex: Scalablespatial publish/subscribe for distributed virtual worlds without borders. InProceedings of the 5th ACM Multimedia Systems Conference (New York,NY, USA, 2014), MMSys ’14, ACM, pp. 127–138. → pages iii, 7[104] NAJARAN, M. T., AND KRASIC, C. Scaling online games with adaptive in-terest management in the cloud. In Proceedings of the 9th Annual Workshopon Network and Systems Support for Games (Piscataway, NJ, USA, 2010),NetGames ’10, IEEE Press, pp. 9:1–9:6. → pages iii, 2, 58, 66, 70, 72, 85[105] PADILHA, R., AND PEDONE, F. Augustus: scalable and robust storage forcloud applications. EuroSys ’13, ACM, pp. 99–112. → pages 30, 53[106] PANDIS, I., JOHNSON, R., HARDAVELLAS, N., AND AILAMAKI, A. Data-oriented transaction execution. Proc. VLDB Endow. 3, 1-2 (Sept. 2010),928–939. → pages 118[107] PENG, D., AND DABEK, F. Large-scale incremental processing using dis-tributed transactions and notifications. In Proceedings of the 9th USENIXConference on Operating Systems Design and Implementation (Berkeley,CA, USA, 2010), OSDI’10, USENIX Association, pp. 1–15. → pages 116135[108] RAO, J., SHEKITA, E. J., AND TATA, S. Using paxos to build a scalable,consistent, and highly available datastore. Proc. VLDB Endow. 4, 4 (Jan.2011), 243–254. → pages 30, 42, 52[109] RATNASAMY, S., FRANCIS, P., HANDLEY, M., KARP, R., ANDSHENKER, S. A scalable content-addressable network. In Proceedings ofthe 2001 Conference on Applications, Technologies, Architectures, and Pro-tocols for Computer Communications (New York, NY, USA, 2001), SIG-COMM ’01, ACM, pp. 161–172. → pages 53[110] SOVRAN, Y., POWER, R., AGUILERA, M. K., AND LI, J. Transactionalstorage for geo-replicated systems. In Proceedings of the Twenty-Third ACMSymposium on Operating Systems Principles (New York, NY, USA, 2011),SOSP ’11, ACM, pp. 385–400. → pages x, 10[111] SOWELL, B., GOLAB, W., AND SHAH, M. A. Minuet: A scalable dis-tributed multiversion b-tree. Proc. VLDB Endow. 5, 9 (May 2012), 884–895.→ pages 53, 54, 115[112] STOICA, I., MORRIS, R., KARGER, D., KAASHOEK, M. F., AND BAL-AKRISHNAN, H. Chord: A scalable peer-to-peer lookup service for internetapplications. In Proceedings of the 2001 Conference on Applications, Tech-nologies, Architectures, and Protocols for Computer Communications (NewYork, NY, USA, 2001), SIGCOMM ’01, ACM, pp. 149–160. → pages 19,51[113] STONEBRAKER, M. Why enterprises are uninterested in nosql, 2010. Lastvisited 2015-01-05. → pages 29, 91[114] SUESELBECK, R., SCHIELE, G., SEITZ, S., AND BECKER, C. Adap-tive update propagation for low-latency massively multi-user virtual envi-ronments. In Proc. ICCCN 2009 (2009), IEEE. → pages 58[115] TACIC, I., AND FUJIMOTO, R. M. Synchronized data distribution man-agement in distributed simulations. In in Proceedings of the Workshop onParallel and Distributed Simulation (1998), pp. 108–115. → pages 87[116] TAYARANI NAJARAN, M., AND HUTCHINSON, N. C. Innesto: A multi-attribute searchable consistent key/value store. In International Journal ofBig Data Intelligence, Inderscience Publishers. → pages iii, 6[117] TAYARANI NAJARAN, M., AND HUTCHINSON, N. C. Innesto: A search-able key/value store for highly dimensional data. In CloudCom ’13 (Bristol,UK, 2013), IEEE. → pages iii, 6, 68136[118] TAYARANI NAJARAN, M., AND KRASIC, C. SinfoniaEx : Fault-Tolerant Distributed Transactional Memory. Tech. rep., University of BritishColumbia, Department of Computer Science, 03 2011. → pages 57[119] TAYARANI NAJARAN, M., WIJESEKERA, P., HUTCHINSON, N. C., ANDWARFIELD, A. Distributed locking and indexing: In search for scalableconsistency. In Ladis ’11 (2011). → pages iii, 7[120] UDDIN, M. B., HE, B., AND SION, R. Cloud performance benchmarkseries, amazon relational database service (rds) tpc-c benchmark. Tech. rep.,Stony Brook Network Security and Applied Cryptography Lab. → pages 90[121] VARDA, K. Protocol buffers: Google’s data interchange format. Tech. rep.,Google, 6 2008. → pages 4[122] VO, H. T., CHEN, C., AND OOI, B. C. Towards elastic transactional cloudstorage with range query support. Proc. VLDB Endow. 3, 1-2 (Sept. 2010),506–514. → pages 115[123] VOGELS, W. Eventually consistent. Communications of the ACM 52, 1(2009), 40–44. → pages 10, 29[124] WALDO, J. Scaling in games & virtual worlds. ACM Queue 51, 8 (2008).→ pages 16, 56[125] WIJESEKERA, P. Scalable Database Management System (DBMS) archi-tecture with Innesto. Master’s thesis, University of British Columbia, 2012.→ pages iv, 7, 108[126] ZHANG, C., KRISHNAMURTHY, A., AND WANG, R. Y. Skipindex: To-wards a scalable peer-to-peer index service for high dimensional data. Tech.Rep. TR-703-04, Princeton, 2004. → pages 53137


Citation Scheme:


Citations by CSL (citeproc-js)

Usage Statistics



Customize your widget with the following options, then copy and paste the code below into the HTML of your page to embed this item in your website.
                            <div id="ubcOpenCollectionsWidgetDisplay">
                            <script id="ubcOpenCollectionsWidget"
                            async >
IIIF logo Our image viewer uses the IIIF 2.0 standard. To load this item in other compatible viewers, use this url:


Related Items