Open Collections

UBC Theses and Dissertations

UBC Theses Logo

UBC Theses and Dissertations

Enforcing crash failure semantics in distributed systems with fine-grained object mobility Duska, Bradley M. 1998

Your browser doesn't seem to have a PDF viewer, please download the PDF to view this item.

Item Metadata


831-ubc_1998-0425.pdf [ 5.4MB ]
JSON: 831-1.0051666.json
JSON-LD: 831-1.0051666-ld.json
RDF/XML (Pretty): 831-1.0051666-rdf.xml
RDF/JSON: 831-1.0051666-rdf.json
Turtle: 831-1.0051666-turtle.txt
N-Triples: 831-1.0051666-rdf-ntriples.txt
Original Record: 831-1.0051666-source.json
Full Text

Full Text

Enforcing Crash Failure Semantics in Distributed Systems with Fine-Grained Object Mobility by Bradley M. Duska B.Sc, University of Calgary, 1989  A THESIS SUBMITTED IN PARTIAL FULFILLMENT OF THE REQUIREMENTS FOR THE DEGREE OF M a s t e r of Science in THE FACULTY OF GRADUATE STUDIES (Department of Computer Science)  we accept this thesis as conforming to the required standard  The University of British Columbia August 1998 © Bradley M. Duska, 1998  In  presenting  degree  this  at the  freely available copying  of  department publication  of  in  partial  fulfilment  of  University  of  British  Columbia,  I agree that  for  this or  thesis  reference  thesis by  this  for  his thesis  and study. scholarly  or for  her  I further  purposes  Department  of  L-^on\byVco  J C ' \ e~ce  The University of British Columbia Vancouver, Canada  Date ^  DE-6 (2/88)  \0K<WS  be  It  gain shall not  permission.  requirements  agree  may  representatives.  financial  the  that  the  by  understood be  an  advanced  Library shall  permission  granted  is  for  for  the that  allowed without  make  it  extensive  head  of  my  copying  or  my  written  Abstract Migration is a powerful technique in distributed systems providing many benefits. The granularity of migration ranges from the coarse-grained movement of whole processes to the fine-grained mobility of individual objects which provides more flexibility and control. One of the costs of fine-grained mobility is an increase in the complexity of programming with respect to failures.  Classic fault-tolerance  techniques for distributed systems cannot be applied in systems with fine-grained object mobility due to the unacceptable overhead of applying these techniques to many small objects. We discuss a group service that allows programmers to apply classic distributed system fault-tolerance techniques to systems with fine-grained object mobility. This service enforces the condition that all objects in a group are either all available or all failed, and has been implemented in the Emerald language and runtime environment. Examples using the group service include a fault-tolerant name server and a fault-tolerant distributed system monitor.  11  Contents Abstract  ii  Contents  iii  List of Figures  vi  1  Introduction  1  1.1  Overview  4  1.2  Emerald Background  4  2 Fault Tolerance  7  2.1  Introduction  7  2.2  Failures  7  2.3  Fault Tolerant Distributed Systems  10  2.3.1  Basic Concepts  11  2.3.2  Idempotent Stateless Services  14  2.3.3  Replication  15  2.3.4  Distributed Transactions  20  2.3.5  Distributed Transactions With Replication  29  2.4  Fine-Grained Object Mobility  31 iii  3  Group Service 3.1  Introduction  35  3.2  Interface  37  3.2.1  Group Class  37  3.2.2  Group Object  38  3.3  4  35  Implementation  41  3.3.1  Group Object  43  3.3.2  GManager Object  45  3.3.3  GRep Object  46  3.3.4  Consistency and Data Structure Objects  49  3.4  Performance  51  3.5  Limitations  57  Examples  59  4.1  Introduction  59  4.2  Token Passer  59  4.3  Name Server  63  4.3.1  Interface  63  4.3.2  Implementation  64  4.4  Distributed Monitor  66  4.4.1  Node Status Monitor  67  4.4.2  CPU Utilization Monitor  68  4.4.3  Link Load Monitor  69  4.4.4  Garbage Collection Monitor  71  5 Related Work  72  IV  6  Future Work and Conclusions  79  6.1  Future Work  79  6.2  Conclusions  80  Bibliography  82  v  List of Figures 3.1  Group Service  42  3.2  New GRep  51  3.3  Add Member  52  3.4  Add Member Scaling  53  3.5  Move One  53  3.6  Move All  54  3.7  Failure Message Activity  55  3.8  Regular Message Activity  56  4.1  Distributed Monitor  66  4.2  Node Status Monitor  68  4.3  CPU Utilization Monitor  69  4.4  Link Load Monitor 1  70  4.5  Garbage Collection Monitor  71  vi  Chapter 1  Introduction Migration is a powerful technique in distributed systems providing many benefits: load balancing through migration to lightly loaded hosts; fault avoidance through migration off a host that is about to fail; performance improvements by migration of computations closer to data or the migration of two communicating computations together; security by migration of the processing of sensitive documents to trusted hosts; and mobility through migration to and from mobile hosts before disconnection and after reconnection. The granularity of migration in distributed systems ranges from the coarsegrained movement of whole processes, to the fine-grained mobility of individual objects. Coarse-grained movement of processes is a heavy weight mechanism, involving the movement of the whole process and its associated environment (for example, address space and ports). Fine-grained mobility of objects is a lighter weight mechanism, generally faster than coarse-grained migration, giving the programmer more flexibility and control. For example, a multi-threaded process may move each of its individual threads to a separate host in order to improve perfor-  1  mance. Within one particular thread, it may be advantageous for certain objects to reside on different hosts, taking advantage of specialized hardware or the proximity of data. Some systems allow the migration of objects as small as only a few bytes [35], other systems migrate much larger objects [57]. Migrated objects may be active objects containing a thread, passive objects not currently invoked (quiescent), or passive objects that are currently invoked. The ability to migrate fine-grained objects provides many benefits as described above, but also results in complex failure modes for applications. For example, with the ability to migrate objects of a process or a thread to many nodes in a distributed system, the programmer must deal with the possibility that any object, upon any invocation or reference, may be unavailable due to some failure. This may happen before, during, or after an invocation. A small piece of a running program, such as an individual object, may be migrated to a host that subsequently fails. Classic fault-tolerance techniques for distributed systems, such as distributed transactions or replication, can not be easily applied to distributed systems with fine-grained object mobility. In order to apply these techniques, crash failure semantics for services must be assumed. With fine-grained object mobility, crash failure semantics can only be guaranteed at the level of individual objects. Applying techniques such as distributed transactions or replication at the level of individual objects results in an unacceptable increase in programming complexity and performance overhead. This leads to the possibility of large and complex code for dealing with failures, or, alternatively, programs that are not robust with respect to failures. Either the programming of reliable distributed applications with object migration is difficult and cumbersome, or users are left with an application that is unreliable. A  2  mechanism is required that allows programmers to gain the advantages of migration, while at the same time allowing the application of classic fault-tolerance techniques for distributed systems. We discuss a service that allows programmers to minimize the complexity of failures that can occur in a system with object migration. Specifically, a group service is discussed that allows programmers to: • Group objects, possibly on different nodes, together in a logical group. The objects in the group are either all available or all failed. • Notify other objects on the failure of a group. • Move and fix the group and its members as one. Groups are generally a collection of objects performing some application level function. For example, a lock on a file or a thread handling an invocation. Making objects in a group all available or all unavailable means that for invocations within groups, no failure handling code is necessary. Objects within a group are guaranteed that since they are able to make an invocation on another group object, that other group object must be available. By enforcing the condition that all objects in the group are all available or all failed, classic techniques such as distributed transactions or replication can be used. Notification can be used by other objects to restart or recover from a failed group. The use of move and fix on groups allows for easy location and movement of the group as a whole. The group service has been implemented in the Emerald language and runtime environment (see Section 1.2). Each object can be a member of at most one group. Objects outside the group are informed of the failure of a group through a callback. If some member of the group becomes unavailable, all members of the 3  group are marked as unavailable and any other objects that have chosen to be informed are invoked.  1.1  Overview  Chapter 2 discusses the various failure modes of distributed applications, classic distributed systems techniques for dealing with these failures, the added complexity of these failures in systems with fine grained object mobility, and how a group service controls this additional complexity. Chapter 3 discusses the Emerald version of the group service, including interface, implementation, performance and limitations. Chapter 4 discusses experiences with the group service focusing on two example applications and their interaction with failures. The first example is a reliable name server, the second example is a distributed systems monitor. Finally, Chapter 5 and Chapter 6 discuss related and future work, and give conclusions.  1.2  Emerald Background  Emerald [35] is a distributed, object-based language and environment providing abstract types and fine-grained object mobility, originally developed as a follow on to the Eden project [8]. Emerald is intended to run on local area networks. An important goal in Emerald is to optimize the performance of the common case. Objects are the unit of distribution and mobility in Emerald. Emerald uses a single-object model: an object can be as small as an integer, or as large as a process. Objects are composed of a network wide unique id, data local to the object (either  4  primitive data or references to other objects), operations that can be invoked on the object, and an optional process. An object that contains a process is said to be active, otherwise it is passive. Synchronization is provided by monitors. The Emerald language provides abstract types. The abstract type of an object is given by the operations provided by that object and these operations' signatures. An object conforms to an abstract type if it implements at least the operations of that abstract type and the parameters conform in the proper way. This implies that each object can conform to a number of different types, and that an abstract type can be implemented by many different objects. Emerald provides no class hierarchy; conceptually, each object carries its own code. Emerald provides explicit language support for mobility. The node type is an abstraction of a physical machine. The locate primitive returns the node where an object resides. Movement of objects between nodes is controlled using the move, fix, refix, and unfix primitives. For example, move X to Y moves object X to node Y. If Y is another object, X is moved to the same node as Y. The move primitive is only a hint for location. The fix primitive provides stronger semantics: moving an object and not allowing it to be moved again until the object is unfixed or atomically refixed at another node. When an object is. moved, in the simplest case only the object and any primitive data within that object are moved. In order to explicitly indicate objects to move together, the attach primitive is used. When a object is moved, any attached variables are also moved. The attach relationship is transitive but not symmetric: when an attached variable is moved via the attach relationship, any objects attached to that variable are also moved; however, if an object referenced by an attached variable is explicitly moved, the object it is attached to does not  5  move. A process is treated as a stack of activation records. When a process performs a remote invocation, the new activation record moves to the remote node to become the base of a new segment of the process stack on the remote node. Therefore the invocation stack of an Emerald process can be distributed across several nodes. In order to provide programmer support for failures, the Emerald language supports an exception mechanism. Any block of code can have an associated failure handler that is invoked if a failure occurs in the code. Failures include invoking a nil reference, dividing by zero, and asserting false. A specific kind of failure is an unavailable object caught by an unavailable handler. This occurs when an object cannot be located due to a failure. When an object fails Emerald does not fail all processes that have active invocations that have passed through this object. These invocations will fail when they attempt to return to this object. Emerald objects that do not change their state over time are said to be immutable. For example, integer objects are immutable because, for instance, the integer 3 cannot be changed to the integer 4. More complex objects can be marked as immutable using the immutable keyword. Since an immutable object never changes state, when a reference to an immutable object is moved to another node a copy of the immutable object is moved to that node. This creates the property that an immutable object is never unavailable.  6  Chapter 2  Fault Tolerance 2.1  Introduction  This chapter discusses a taxonomy of failures, standard techniques used to create fault-tolerant distributed systems, and the additional complexity that fine-grained object mobility introduces for fault-tolerant distributed systems.  2.2  Failures  Cristian [20] provides a taxonomy of fault tolerant distributed systems. The basic architectural concepts used are the ideas of service, server, and depends on. A service is a collection of operations exposed as an interface through which service users execute these operations. A server (also known as a component) performs the operations specified in a service without exposing the implementation to the service users. A depends on relation exists between two servers when the correct operation of one server depends on the correct behaviour of the other. It is said the first server depends on the second. A server at one level of abstraction may be a client to a  7  server at a lower level of abstraction. A server is deemed correct if, in response to inputs, it behaves according to the service specification. A server failure occurs when the server does not behave according to the service specification. An omission failure is one where the server does not respond. A timing (or performance) failure is one in which the server responds but not within the expected time interval. Real time systems are particularly concerned with timing failures. A response failure occurs when the server responds incorrectly; this may either be returning incorrect output (value failure), or moving to an incorrect state (state transition failure). If after the first omission failure the server does not respond until the server is restarted, the server is said to suffer a crash failure. The recovery from a failed server depends on the failure semantics of the server: the likely failure behaviours of the server. For example, the UDP service has omission but not value failure semantics: UDP does not guarantee that all packets will be delivered, but any packet that is delivered is protected from corruption by a checksum. A server that may fail in any manner is said to have arbitrary failure semantics (also known as a Byzantine failure). In general, it is simpler and more efficient: to provide fault tolerance on stronger failure semantics. For example, providing fault-tolerance on top of a UDP service that has only omission semantics is simpler and involves less execution overhead that providing fault-tolerance on top of a UDP service that has omission and value semantics (where UDP may deliver a corrupt packet). One particularly difficult arbitrary failure that may occur in distributed systems is a network partition. A network partition occurs when the systems that make up the distributed system are split into two or more groups, with the property that  8  communication within groups is possible but communication between groups is not. The complexity comes from the fact that each group may perform recovery based on the assumption that the other groups have failed, when in fact the other groups are still active. Failures at a lower abstraction level may become a different kind of failure at a higher abstraction level. For example, a corrupted UDP packet (a value failure) becomes to the application an omission failure when the UDP server does not deliver the packet. An exception handling mechanism can be used to propagate failures across abstraction levels and to mask low level failures from higher level servers (hierarchical masking). A server can attempt to recover from a failure by masking that failure; if that is unsuccessful the failure can be propagated to the next higher abstraction level. A server that either provides a service or signals an exception without changing its state considerably simplifies fault-tolerant programming by providing an easily understood omission failure. A question that needs to be addressed in the design of a server is when is the probability of a failure small enough to be considered negligible. For example, how likely is it that UDP will return a corrupted packet that is not detected by the UDP checksum? If the probability of this occurring is unacceptably high, then weaker failure semantics must be assumed for the UDP service. In addition to functional requirements, a server must give a stochastic specification: the probability that the standard behaviour is observed as well as the probability that a failure outside the server's anticipated failures occurs.  9  2.3  Fault Tolerant Distributed Systems  Techniques for achieving fault tolerance in distributed systems can be broadly divided into three categories: idempotent stateless services, replication, and distributed transactions. Idempotent stateless services provide the simplest form of fault tolerance, supporting problems where the server holds no state information for clients and operations can be executed one or many times with the same result. Replication is the use of a group of redundant servers with a switch to a working server in the case of a server failure. The ordering of events must occur consistently between servers. Approaches to replication include lazy replication, process groups, and available copies. A particular concern with replication is correctly handling network partitions. Distributed transactions are a set of operations over one or many servers that are guaranteed to be atomic, even in the case of failure. In order to make transactions atomic and durable, the data affected by a transaction must be recoverable after an unexpected failure. Concurrency control techniques are used to improve performance while guaranteeing serial execution. With distributed transactions, an atomic commit protocol is required between servers, and concurrency control techniques must correctly handle distributed deadlock. Replication and distributed transactions may be used together. For example, to provide a highly-available fault tolerant service, distributed transactions with replicated servers could be used. Combining replication and distributed transactions requires that one-copy serializability be maintained between replicas. Portions of this section draw from [19].  10  2.3.1  Basic Concepts  Several basic distributed system concepts are common to both replication and distributed transactions. These include logical time, serial equivalence, distributed mutual exclusion, and the use of a distributed coordinator. Logical time is a technique used to order events that occur at different systems. Lamport [41] introduce an ordering in distributed systems called happened before, (also known as casual ordering). This ordering uses two basic ideas: events within one process occur in the order that they are observed, and a message sent between processes is sent before it arrives at the receiver. These relations are transitive. To turn this into a logical clock, Lamport [41] added a monotonically increasing timestamp to each process that is incremented before each event and piggybacked on messages between processes. On the receipt of a message, the event timestamp is the maximum of the local timestamp and the timestamp in the message. Using these rules, it can be shown that if the timestamp for one event is less that the timestamp for another event, then the event with the lower timestamp happened before the event with the higher timestamp. A typical server will have many clients requesting services simultaneously. The risk is that two client's use of the service may conflict and place the server in an inconsistent state. One solution for this is to simply not allow updates to occur simultaneously - each use of the service is executed in its entirely before the next client is allowed to use the service. This is known as serial execution. This has the effect of removing any concurrency in the operation of the server. To increase concurrency, clients can be serviced concurrently as long as their use of the server is serially equivalent: the clients' use of the service is equivalent to some serial execution.  11  In many cases a resource is shared between many processes and each process must temporarily gain exclusive access to the shared resource. This requires some technique for achieving distributed mutual exclusion. Three basic requirements for distributed mutual exclusion are: safety, liveness, and casual ordering. Safety is the property that only one process gains access to the shared resource at any one time. Liveness is the property that any process needing access to the shared resource eventually gets it. The simplest way to achieve distributed mutual exclusion is using a central server as the exclusive granter of access rights to the shared resource. Each process requests access to the resource from the server and the server grants it when the resource is free. Although this method is simple, the server can become a performance bottleneck, the server is a critical point of failure, and recovering from client failure is difficult. To remove these exposures, a technique based on distributed agreement can be used. [51] proposes a method that uses a multicast message to request access to the shared resource, with access granted only after all other processes have replied. Each process keeps a local state of the access: either released, wanted, or held. When a process wants access to the shared resource, it sets the state to wanted and multicasts a request message to all other processes. When all requests have been replied to, the process sets the state to held and is able to access the shared resource. If another process also wants access to the shared resource at the same time, timestamps based on logical clocks are used to order messages in a happened-before order, determining which process logically requested access first. This technique is more expensive than the central server technique in that every process is involved in every access to the shared resource. Also, the failure of any process must be handled or else liveness will be violated. A simpler distributed technique for achieving distributed mutual exclusion is to arrange the  12  processes in a logical ring, and to move an access token around the ring. A process can only access the shared resource when it has the token. If the token holder fails, a new token must be generated. Unfortunately, it is difficult to differentiate a token holder failure from a network partition. It is common for a distributed system to need to use one system as a distributed coordinator. Fixing some system as coordinator makes the distributed system vulnerable to the failure of this system. A solution is to use election algorithms to select the coordinator. Two election algorithms are the bully algorithm and a ring-based algorithm. . The bully algorithm [60] uses a unique identifier for each member, with each member knowing of all other members and their identifiers. Three kinds of messages are used: an election message is used to announce an election, an answer message is used to respond to the election message, and a coordinator message is used to announce the new coordinator. When a member notices the coordinator has failed, it sends an election message to all members with a higher identifier. Upon receiving an election message, the member sends an answer message and begins an election unless one is already active. If a member does not receive any answers from its election messages, it assumes it is the coordinator and announces this to all members with lower identifiers with a coordinator message. The bully algorithm is able to withstand failures of members during an election through the use of timeouts. A ring-based algorithm [15] works with a group of processes arranged in a logical ring. Each process has a unique identifier and is able to communicate with its neighbour in either a clockwise or counterclockwise direction. A process starts an election by marking itself as a participant and sending an election message with the process' identifier to the process' neighbour. Upon receiving an election  13  message, a process compares the received identifier with its own identifier. If the received identifier is lower that the identifier belonging to the receiver, the receiver places their identifier in the election message and forwards it to their neighbour. If the received identifier is higher than the identifier belonging to the receiver, the receiver simply forwards the election message to their neighbour. If the received identifier is equal to the identifier belonging to the receiver, then this process is the new coordinator and marks itself as a non-participant and sends an elected message to its neighbour with its identifier enclosed. Upon receiving an elected message, a process marks itself as a non-participant, records the identity of the new coordinator, and forwards the elected message to its neighbour. This algorithm is not able to withstand failures of processes during an election.  2.3.2  Idempotent Stateless Services  Idempotent stateless services provide a simple form of fault tolerance. A server is stateless if it holds no information for particular clients. A service is idempotent if executing an operation multiple times is equivalent to executing the operation once. By being stateless, a server can tolerate the failure of clients. By providing idempotent operations, clients can tolerate the failure and subsequent restart of a server. To guarantee the permanence of changes to data, a server writes the changes to a recovery file before responding to the client. Recovery file techniques are described in Section 2.3.4. To allow for continuous service in spite of server failures, a server can use replicas as described below.  14  2.3.3  Replication  Replication is the use of a group of redundant servers with a switch to a working server in the case of a server failure. In this way the failure of a server in the group can be masked by another server performing the requested work. This is known as group masking [20]. The output from a service request is known as the group output and is a function of the individual member outputs. A server group that can survive any k concurrent member failures is called fc-fault tolerant. If each member in the server group has crash and performance failure semantics, k + 1 servers are needed to provide fc-fault tolerance. If each member in the server group has arbitrary failure semantics, 2fc+l servers are needed to provide A;-fault tolerance. The challenge in replication is to maintain consistency between the replicas while not overly impacting performance. Replication can occur in hardware or software or both. In a closely synchronized group, each member executes the same sequence of requests in parallel. In a loosely synchronized group, one or more primary servers process the service requests while the other servers periodically receive state updates from the primary servers. With hardware replication, the switch from a failed to a working component is kept consistent either automatically in hardware, or using the software of the system to mask the failure and switch the components [3] [38] [16]. Failure detection can occur using either error codes or duplicate components that compare results. For example, [9] uses a software layer between the operating system and the hardware called the hypervisor. The hypervisor coordinates the execution of instructions on both a primary and a backup processor, with a switch to the backup processor on the failure of the primary. With software replication, a replicated software server must be able to pro15  vide consistent ordering of requests across all replicas with a minimal impact on performance. Ordering can be defined in various ways: casual, total, or sync ordered. Casual ordering captures a happened-before relation as described above. (Event a is ordered before event b if a occurs before b in the same process, or if b is the receipt of a message sent at a.) Total ordering occurs when all operations are performed at each replica in the same order. For example, event a is ordered before event b at all replicas or b is ordered before a at all replicas. It is not total ordering if a is ordered before b at some replicas and b is ordered before a at others. Sync ordering occurs when an operation is performed at all replicas at the same time, so that all other operations can be split in time as happening either before or after the current operation. For example, if a is a sync-ordered event, event b will not be processed by any replica until a has been processed by all replicas. It is convenient to use a basic architectural model for replicated data [19]. The components of this model are clients, front ends, and replica managers. A client performs operations on the replicated data. These operations may be either read or write. A client's operation is handled by a front end. This operation is communicated by the front end to one or more replica managers. Each replica manager is responsible for managing a set of replicated data. In order to achieve a desired ordering, replica managers place operations that should occur in the future on a hold-back queue until all operations before this one have been performed [56]. An operation is stable at a replica manager if all operations before this one have been performed. These operations must satisfy safety and liveness properties similar to those described above for mutual exclusion. Safety is the property that no operation will be performed out of order by being taken off the hold-back queue early. Liveness if the property that no operation will be on the hold-back queue indefinitely.  16  Many algorithms exist for achieving both casual and total ordering. Casual ordering can be achieved through the use of vector timestamps [49]. The logical clock algorithm as described in Section 2.3.1 is not adequate to deduce casual ordering as the timestamps in this algorithm do not carry enough information. For example, if the timestamp for event a is less than the timestamp for event b, we cannot conclude that event a happened before event b. (Although the converse is true, if event a happened before event b, then the timestamp for event a is less than the timestamp for event b.) Vector timestamps solve this by carrying more information in the timestamp. A vector timestamp contains one entry for each replica manager representing all events seen at that replica manager. Each event carries a unique vector timestamp generated by incrementing the timestamp in the vector timestamp for the replica manager that handled the event. An event happens before another event if each element in its timestamp is less than or equal to the corresponding element in the timestamp of the other event. Each replica manager keeps a timestamp of the latest event applied at this replica. A new event is applied to the replica when all events that happen before this new event have been applied. A vector timestamp is also maintained by each front end representing the latest timestamp the front end has read. The front end must next read from a replica manager that is at least as advanced. Total ordering can be achieved using a sequencer or some form of distributed agreement. A sequencer [36] is simply a central process that assigns sequence numbers to all requests. This implies that the sequencer must be involved with each request, making the sequencer a potential performance bottleneck and a failure exposure. In order to avoid a central sequencer, distributed agreement can be used to order requests. [7] suggests an algorithm in which replica managers generate initial  17  identifiers for events as they arrive, and the front ends use these initial identifiers to generate final identifiers. A front end sends a new event to all request managers. Each request manager stores its own largest suggested identifier in Pmaxi the largest final identifier in Frnaxi &nd the number of request managers in N. On receiving a new event, a replica manager returns Max(Pmax,Fmax)  + i/N + 1 to the front end  as the suggested identifier for the new event, where i is the identifier for this replica manager. After receiving all suggested identifiers for the new event, the front end returns the maximum of the suggest identifiers to the replica managers as the final identifier. A lazy update scheme is used in [40]. Update messages are only sent between replica managers when several can be grouped together or when a replica manager needs an update to handle an event. In this system, each front end usually communicates with one replica manager unless that replica manager has failed or is heavily loaded in which case the front end can communicate with another replica manager. Each event is designated as casual, forced (casual and total) or immediate (sync) ordered. One particular approach to replication uses process groups, a group of processes that work together to accomplish a task. Process groups need both a group membership service and some form of fault tolerant multicast. Each process has a list of the current group members called a view. Virtual synchrony [6] requires all processes in a group that receive a message to have the same view when the message is received. The membership service must maintain virtual synchrony in spite of group members failing and new members joining. Different forms of fault tolerant multicast are used to provide different reliability and ordering guarantees. Groups can be categorized by the patterns of communication the members of the group are  18  involved in [19]. A peer group is one in which all communication is directed from members of the group to members of the group. A server group is a collection of servers that all handle clients' requests. A client-server group is one in which each client is included in a server group. A subscription group is a group of processes that all receive the same information from some information source. A hierarchical group is a way of managing large complex groups by structuring them in a hierarchy. Examples of process groups include Isis [6], Horus [63], and Totem [47]. Isis maintains a list of the current members of each group that change in a sync ordered and failure respecting manner. Isis provides unordered, causally ordered, totally ordered, or sync ordered multicast for communication within the group, No consideration is given to network partitions. Recent work has focused on the formal specification of process groups [23], and on extending the process group approach to the CORBA [28] environment [46] [48]. A primary/copy model of replication is used in [43]. In this model, all write events are handled by a designated primary replica manager that then propagates the event to all other replica managers. Read events can be handled by any replica manager. A more general version of primary/copy replication is a read one/write all scheme. In this scheme, reads and writes can be directed to any replica manager, but writes must be propagated to all replica managers. Primary/copy cannot continue if the primary server fails and the primary server is also a potential performance bottleneck. Read one/write all cannot continue if one of the servers fail. Available copies replication allows for some replica managers to be unavailable [53]. Read operations can be satisfied by any available replica manager, and write operations are directed to all available replica managers. Available copies does not perform correctly in the event of a network parti-  19  tion. A group of replica managers will assume the other groups of replica managers have failed and continue updating the shared data. Replication schemes that are able to perform correctly in spite of network partitions can be categorized as either optimistic or pessimistic [22]. Optimistic schemes assume inconsistencies between the partitions will not occur and do not limit availability during a potential partition. Pessimistic schemes assume inconsistencies will occur and limit availability during a potential partition. An optimistic scheme is available copies with validation [21]. In this scheme, partitions use available copies as usual. When the partition is repaired, any conflicts between the partitions must be manually fixed. A pessimistic scheme is quorum consensus [26] [31]. In this scheme, a partition is allowed to perform an operation only if it has enough members (for example, a majority). The virtual partition algorithm [1] attempts to combine the correctness of the quorum consensus scheme with the performance of the available copies scheme.  2.3.4  Distributed Transactions  Distributed transactions are a set of operations over one or many servers that are guaranteed to be atomic, even in the case of failure. They provide consistent and concurrent access to long lived shared data, maintaining the service in a consistent state. Transactions are usually started by a BeginTransaction operation, and ended by an EndTransaction operation. The BeginTransaction returns a unique identifier for this transaction. Any operations to the server between these calls that includes this identifier is part of the transaction. These operations typically read or write the data items at the server. A transaction is committed after a successful call to EndTransaction. A client or a server may abort a transaction at any time before  20  it is committed, undoing any effects of the transaction. To support transactions aborting, a server keeps a tentative version of items updated by a transaction in volatile memory until the transaction has committed. Distributed transactions use many concepts from single-server transactions. An important requirement for single-server transactions is to maintain ACID properties: atomic, consistent, isolated, and durable [30]. A transaction is atomic if the effects from the transaction are all or nothing, even when the server or client fails. A transaction is consistent if it moves the service from one consistent state to another. Consistency is usually the responsibility of the application using the transaction service. A transaction is isolated if it has no effects on other transactions while it is being processed. A transaction is durable if all of the effects of the transaction are permanent after it has completed. To make transactions atomic and durable, the data affected by a transaction must be recoverable after an unexpected failure. Each transaction has an associated intentions list: a list of all the names and values of data altered by the transaction. By the time a server responds that a call to EndTransaction was successful, the intentions list.must be saved to a recovery file in permanent storage. It is the job of the recovery manager to save intentions lists in and manage the recovery file, and after a failure to abort any uncommitted transactions and restore the most recently committed transactions [19]. Recovery files can be organized either as logs or as shadow versions. In the simplest form of logging, the recovery file records all transactions performed by the server. To keep the recovery file from growing too large and to improve recovery time, a log can be organized as a checkpoint of the server data and all transactions since the checkpoint. Before a server responds that an  21  EndTransaction call was successful, the recovery manager appends the intentions list for the transaction to the recovery file. If the server fails, at most the final append on the transaction file will be incomplete. After a failure, the recovery manager restores the data values in all complete entries in the recovery file from the most recent entry backwards. Another way to organize recovery files is to use shadow versions. In this case, all data values and updates to these values are in a version-store file. A map contains pointers to the current values of all data items. When a transaction commits, all new data values are appended to the version-store file, a copy of the map is made, the appropriate data pointers in the new map are changed to point to the new values, and the new map becomes the current map. An alternative to disk as stable storage is to use persistent memory. Vista [44] provides atomic and durable services for transactions using the Rio [17] file cache. The Rio file cache, combined with an uninterruptible power supply, survives operating system crashes and provides persistent memory. For working sets that fit entirely in main memory, combining the Vista transaction service with the Rio file cache improves transaction overhead by a factor of 2000. To handle client failures, a server can give each transaction a timeout and abort any transactions not completed by the timeout. A client can handle a failed server either by simply returning an error to the application user, or by attempting to recover from the failed server, for example by trying an alternate server. To make transactions isolated, transactions can be performed one at a time. Unfortunately, this provides poor performance. To increase performance, transactions can execute concurrently as long as the transactions are serially equivalent. Concurrency control provides techniques to increase concurrency while maintaining  22  serial equivalence. If serial equivalence is not maintained, transactions may suffer lost updates or inconsistent retrievals. A lost update occurs when two transactions both read an old value for a data item, and then both update the data. The first update for the data is lost. An inconsistent retrieval occurs when one transaction reads partially updated data from another transaction. When the order of execution of two transactions determines the combined effect of the transactions, they are said to conflict. A transaction that aborts can cause complex interactions with concurrent transactions through both dirty reads and premature writes. A dirty read is a transaction reading a value set by another transaction that subsequently aborts. A premature write is a transaction writing a value based on a previous transaction that updated the same value. If the previous transaction now aborts, the write will be incorrect. To ensure recoverability, any transaction that has seen the effects of another uncommitted transaction delays committing until the previous transaction has committed or aborted. If the previous transaction aborts, any transactions that saw the affects of this transaction will also have to abort. This can lead to cascading aborts, where the abort of one transaction causes many transactions to abort. To avoid cascading aborts, a transaction service can be made strict: a transaction's read or write of a data item is delayed until all uncommitted transactions that wrote the data item commit or abort. Three main concurrency control techniques are locking, optimistic, and timestamp ordering. With locking, access to data occurs in the order different transactions request a lock for the data. In the simplest version of locking, each transaction places a lock on a required data item that guarantees exclusive access to the data until the lock is released. In order to guarantee serial equivalence, af-  23  ter a transaction has released any lock it is not allowed to acquire any new locks. This is known as two-phase locking. To provide a strict transaction service to avoid cascading aborts, each transaction can not release any locks until the transaction commits or aborts. This is known as strict two-phase locking. To increase concurrency with locking, it is useful to view operations in transactions as either read or write. Two transactions reading the same data do not conflict, while one reading and one writing or both writing do conflict. Concurrency can be increased by the locking service offering both read locks and write locks. Lock services can be implemented using the distributed mutual exclusion techniques of Section 2.3.1. The use of locks can lead to deadlock: two or more transactions blocked waiting for each other. For example, transaction a may be waiting for item x held by transaction b while transaction b is waiting for item y held by transaction a. Deadlocks are prevented by a transaction locking all data items it needs at the beginning of the transaction, although this reduces concurrency. A better approach is to either detect cycles in group of transactions waiting for each other, or to use a timeout to detect deadlocked transactions. The deadlock can be removed by aborting one of the transactions. Two-version locking and hierarchic locks are designed to increase concurrency in locking schemes. Two-version locking [25] delays setting exclusive locks until the transaction commits by introducing a commit lock. The conflicts in two-version locking are commit with any other locks and write with write locks. As the commit period is usually relatively short, concurrency is increased. Hierarchic locks [27] uses locks of different granularities. The setting of a parent lock sets all the lower level child locks. The setting of a child lock sets an intention lock on the parent lock. Optimistic concurrency control does not check for conflicts until the transac-  24  tion is committed [39]. The assumption is that the probability of two transactions conflicting is low, and therefore the overhead of locking is not required. Each transaction goes through three phases: read, validation, and write. During the read phase, transactions read the most recently committed versions and write tentative versions. During the validation phase, this transaction is checked against any other transactions for conflicts. If the transaction does not fail the validation phase, the tentative versions are made permanent in the write phase. Two types of validation are possible: forward or backward [29]. Backward validation determines if the items read by this transaction overlap with any items written by earlier overlapping transactions. Forward validation determines if the items written by this transaction overlap with any items read by active overlapping transactions. Timestamp ordering controls access to data items according to a unique timestamp assigned each transaction [5]. Each access to a data item is validated and if the access cannot be validated, the transaction is aborted. The timestamps are used to totally order transactions. A write operation is valid if the data item being written was last read and written by earlier transactions. A read operation is valid if the data item was last written by an earlier transaction. If each transaction . uses tentative versions of the data, transactions must wait for earlier transactions to commit their writes before performing reads. Timestamp ordering is implemented by each data item having a write timestamp, a set of tentative versions with their associated write timestamps, and a set of read timestamps [14]. In mulitversion timestamp ordering [50], each data item has in addition a set of committed versions and their associated write timestamps. A read that arrives late can be satisfied from an old version. Transactions may be a set of nested transactions to increase concurrency and  25  improve failure handling. Concurrency is increased by transactions nested at the same level executing concurrently. Failure handling is improved by the independent commit or abort of nested transactions. An aborted child transaction does not mean the parent transaction must abort: the parent transaction may attempt to recover from the failure of a child transaction. However, a child transaction cannot commit until the parent transaction commits. Distributed transactions build on the ideas given above. In order to make distributed transactions atomic and durable an atomic commit protocol, commonly the two-phase commit protocol, is needed. The simple commit protocol from singleserver transactions is inadequate because using this protocol no server can independently decide to abort a transaction. In the two-phase commit protocol [27], one server is the coordinator for each transaction. This server knows of all other servers involved in the transaction, and all other servers involved know the coordinator. This can be implemented by each new server contacting the coordinator upon joining a transaction. The coordinator is responsible for carrying out the two-phase commit protocol. In the first phase the coordinator asks each server involved in the transaction if they are prepared to commit. In the second phase the coordinator instructs each server to commit (or abort) the transaction. By the time a server has said it is prepared to commit, the effects from the transaction must be saved in permanent storage. To make data recoverable from failure during the two-phase commit protocol using logging, the recovery file must also store the coordinator and the status of the transaction: prepared, committed, or aborted. To make data recoverable from failure during the two-phase commit protocol using shadow versions, an additional file containing each transaction's status, coordinator, and intentions list must be used. Upon recovery, the recovery manager treats any committed transactions in  26  the same manner as the single-server case. The recovery manager must contact the coordinator for any prepared transactions to determine if the transaction should be committed or aborted. With nested transactions, the two-phase commit protocol must be extended to include the child transactions of each nested transaction [65]. Concurrency control techniques such as locking, optimistic, and timestamp ordering can be extended to distributed transactions, and must be able to detect and recover from distributed deadlock.  In distributed transactions, each server  is responsible for controlling concurrent access to its own data. With locking, each server must maintain all of a transaction's locks until the two phase commit protocol has completed. With optimistic concurrency control, after a local validation, a global evaluation is made that checks for conflicts between servers [13]. Alternatively, a globally unique number can be assigned to each transaction at the start of validation and used to order the transactions [54]. With timestamps, serial equivalence is achieved by using a globally unique timestamp. This can be achieved by attaching a server id to each timestamp. The transactions are ordered firstly by their timestamp and secondly by their server id. This requires that the timestamps between servers are roughly synchronized. Distributed deadlocks occur when two or more transactions on different servers are blocked waiting for each other. Distributed deadlocks may occur even when there is no deadlock within any one server. A timeout can be used to detect when a distributed deadlock has occurred. Unfortunately it is difficult to correctly set the timer value: if the timer value is too small, transactions that are not deadlocked may be aborted; if the timer value is too large, transaction response will be sluggish. A better approach is to detect cycles of transactions waiting for each other on different servers. The easiest solution is to use a central deadlock-detection server  27  that periodically looks for cycles. However, the central server approach is both a potential performance bottleneck and a failure exposure. A distributed approach to distributed deadlock detection uses edge chasing [4]. When a transactions starts waiting for a transaction that is waiting for a lock at another server, a server sends a probe containing the wait-for relationship to the server with the lock. If the lock is shared, the probe is sent to all servers. Upon receiving a probe, a server checks to see if the transaction holding the lock is also waiting. If it is, the server adds this transaction to the wait-for relationship in the probe, checks if a cycle has now been created, and forwards the probe as above. If a server detects that a cycle has been created, a distributed deadlock has been detected and the cycle must be broken by aborting a transaction in the cycle. A potential problem, with this approach is that several servers may independently conclude that deadlock exists and each abort a transaction, the end result being that more transactions than necessary to break the deadlock are aborted. This can be solved by assigning priorities to transactions, and always aborting the transaction with the lowest priority [61] [18]. Transactions have been implemented in many systems, for example the Argus [42] and Arguna [59] systems. Argus provides a programming language and environment with explicit support for transactions. Resources are encapsulated in objects called guardians, with the resources accessed through procedures called handlers. Each guardian resides at a single node and contains persistent objects known as stable objects. Argus supports atomic transactions known as actions and uses a two-phase commit protocol. Argus assumes nodes have crash failure semantics, and the network may partition. Arjuna provides much the same support for transactions as Argus, except that Arguna provides this support using a C + + class hierarchy. Transactions can be considered as a special case of the problem of providing  28  fault tolerance for groups of processes communicating by message passing. In the general problem, applications are not structured as units of work (transactions) sharing a set of data to which concurrency control has been applied. Solutions to the general problem typically maintain checkpoints and message logs for each process, with recovery to a consistent distributed state by appropriate roll back of processes to a checkpoint and selective replaying of logged messages. These techniques can be pessimistic where checkpoints are synchronous with communication [3] [37], or optimistic where checkpoints are asynchronous [62]. These techniques are also useful for other problems in distributed systems such as deadlock recovery and distributed debugging [64].  2.3.5  Distributed Transactions With Replication  Distributed transactions can be combined with replication in order to increase availability and potentially increase performance. The replicated transactional service should appear to the user the same as a non-replicated service. The property of one-copy serializability must be maintained between the replicas: the effect of transactions performed on replicated data is the same as if they had been performed on a single copy of the data. Recovery of a server in a replicated transaction service is more complex because while the server is failed the other replicas continue to perform updates to the data. In addition, a transaction must be serialized with respect to failures of replica managers; any failed replica managers must appear to fail before or after the transaction. In distributed transactions with replication, the two-phase commit protocol becomes a nested two-phase commit protocol: each replica manager for a data item will communicate with all other replica managers for that data item before preparing  29  or aborting the transaction during the first phase of the two-phase commit protocol. Using the available copies method of replication, each replica manager performs concurrency control locally: read operations are performed by one replica manager, and write operations are propagated to all replica managers. This ensures one-copy serializability as long as no replica manager fails or recovers during concurrent transactions. To guarantee serial equivalence in spite of possible failures and recoveries, an extra validation phase called local validation is required [19]. This is used to ensure that any failure or recovery does not appear to occur during a transaction. Before a transaction commits, any replica managers used by that transaction are checked for failures, and any other replica managers that have failed are checked to see if they have recovered. If either of these is true, one-copy serializability cannot be ensured and the transaction is aborted. Using caching with distributed transactions can be considered a form of replication. [24] provides a taxonomy and performance evaluation of transactional clientserver cache-consistency algorithms. Two key properties of these algorithms are dynamic replication and second-class ownership. Dynamic replication means that data is cached based on the run-time behaviour of clients. Second-class ownership means that the data stored at clients is not considered to be equal with the data stored at servers. The primary distinction in the taxonomy is between avoidance and detection schemes. Avoidance schemes avoid any client accessing stale data by ensuring all cached data is valid. This is a read-one write-all approach. Detection schemes detect if a transaction has accessed stale cached data during the transaction. Avoidance schemes are further categorized based on when a transaction initiates consistency actions, how long a transactions maintains write permission, priority in remote conflicts, and how a remote update notification is handled. De-  30  tection schemes are further categorized based on when during the transaction the validity of the data is checked, how notification of changed data occurs, and how such a notification is handled. Performance results indicate avoidance based schemes perform best in pessimistic environments, and detection based schemes perform best in optimistic environments.  2.4  Fine-Grained Object Mobility  Adding fine-grained object mobility to a distributed system adds additional complexity in creating fault-tolerant applications. In a stand alone system, it is usually reasonable to assume crash failure semantics: either the application is working, or it has failed. With the use of a distributed implementation, it is usually reasonable to assume crash failure semantics for the components of the distributed application: certain parts of the application fail while other parts of the application continue to run. For example, a server in a multiple server application may fail, leaving the other servers and possibly clients responsible for recovering from or at least masking the failure. This is a well understood area in distributed systems, solved using replication and transaction techniques as described above. As discussed, these techniques typically assume crash failure semantics. With the introduction of fine-grained object mobility to a distributed system, it is no longer reasonable to assume crash failure semantics for the components of the distributed application. Components participating in a distributed system may in fact reside on multiple nodes. Moreover, the nodes on which a component resides may change dynamically as the application executes. Whereas the move from stand alone to distributed systems creates the possibility that components of the application, such as a server, can fail, the move from 31  distributed systems to distributed systems with fine-grained object mobility creates the possibility that much smaller and more interdependent pieces of a component can fail. For example, upon a node failure, some thread that is part of a server may be lost. The server will continue operating expecting the thread to be active, perhaps updating a data structure shared with other threads in the server. The period where the server is still operating yet failed because of the lost thread can result in the server generating erroneous results, and therefore exhibiting arbitrary or Byzantine failure semantics. It may seem reasonable to apply the standard techniques of transactions and replication to distributed systems with fine-grained object mobility. The question is: at what granularity of the system should these techniques be applied? It is incorrect to apply them at the level of the components that make up the distributed application as it cannot be assumed these components have crash failure semantics. Thus if a programmer wants to take advantage of fine-grained object mobility in a distributed application and provide fault tolerance, replication and/or transactions must be applied at the level of the individual objects within the component. With replication, each and every object must be replicated.  With transactions, each  and every access to another object must be treated as a distributed transaction. These approaches are unsuitable for reasons of both programming complexity and performance overhead. Another approach is to use exceptions when an object is unavailable and recover from this exception. In Emerald, this is the unavailable or failure handler as described in Section 1.2. With this approach, every invocation on another object is wrapped by an unavailable handler. When an object is found to be unavailable, the application can take appropriate recovery action in the handler. This approach  32  turns out to be complex, error prone, and difficult to test. Complex because failure code has to be included on every invocation of an object in case that object is unavailable, and also because handling a failed object at some arbitrary point in the execution is difficult. Error prone because of the number of situations where an object may fail. Difficult to test because of the unpredictability of failures: it is difficult to determine if all possible scenarios have been tested. Another approach is for the programmer to explicitly program the location of all objects in the component. In Emerald, this can be achieved using the move, fix, and attach keywords as described in Section 1.2. The programmer can ensure that objects are always moved together and fixed at certain locations using these keywords, and then create failure handlers as described above when cross-node invocations occur. This technique simplifies some of the complexity described above for using simple failure handlers on every invocation, but it is still complex, error prone and difficult to test. What is required is a mechanism for the programmer of distributed applications to maintain the advantages of using fine-grained object mobility, while moving the arbitrary failure semantics of such systems to crash semantics in order that standard distributed systems fault tolerant techniques can be applied. If order to be most useful, this mechanism should: • Provide crash failure semantics for a component, • Notify any interested objects on the failure of a component, • Allow all objects in the component to be moved as a single object, • Be simple to use, • Allow arbitrary and unrestricted movement of objects in the component, 33  • Not require any special interfaces from objects in the component, • Remain internally consistent in spite of failures, • Have low overhead in the normal case of no failures, and • Correctly handle network partitions.  34  Chapter 3  Group Service 3.1  Introduction  In order to create reliable applications in a distributed system with fine-grained object mobility a method, as discussed in Section 2.4, is needed to transform failures from arbitrary to crash failure semantics. This chapter discusses a group service implemented in the Emerald language and environment that provides this capability. The group service is conceptually simple, representing software components as groups of objects. The programmer of the component creates an initially empty group representing the component, and as objects are created or join the component, the programmer adds them to the group. The group service ensures that all members of a group will either all be available, or all be failed. Crash failure semantics are created from the usual arbitrary failure semantics. This is implemented by the group service noting each failed node in the distributed system, and determining if any members of a group were on the failed node. If any members of a group were on a failed node, the group is considered to have failed and all objects in the group are  35  marked as failed. One implication of this model is that programmers creating groups do not need to handle unavailable objects when performing invocations between objects in a group. Since a group object is able to perform the invocation, it must be the case that the object being invoked is available. This service is accessed through a new Emerald type called a Group. An object in a group is called a member of that group. Members of the group do not require any special interfaces, and are allowed to move freely in the distributed system. Three fundamental services are provided by the group service: reliability, notification and location. Reliability allows the programmer to treat all members in a group as a logical component as discussed above, residing on one or many nodes. Failures are assumed to occur on the granularity of a node: either a node is available or it is unavailable; it is never partially available. After the failure of a node containing a group member, all surviving members of the group are marked as unavailable following the completion of the current operation, if any, being performed on the group. This technique correctly handles network partitions in the case where the members of a group are split into two or more partitions (but see Section 3.5). A group that is partitioned will have all group members marked as unavailable by the group service in all partitions. Notification is a mechanism that uses a callback to notify members outside of the group when a group fails, with multiple listeners allowed per group. When a group fails, the group service first marks all members of the group as unavailable, and next notifies all listeners for that group of the failure of the group. The application using the Group service can perform application specific recovery upon notification  36  of failure, for example restarting servers or aborting transactions. Group listener objects are required to be immutable. This ensures the listener object itself will not be affected by a failure of one of the nodes participating in the group. An additional notification mechanism uses a callback to notify members outside of the group on the start and failure of nodes. Applications can use this information to perform additional application specific tasks, for example creating a replica on a new node. As with group listener objects, node listener objects are required to be immutable. Location allows the application to explicitly control the location of groups. An example of this is creating a server replica with a group of objects and moving this to a node distinct from the original server. The fix semantics provided by the group service are a "soft" version of fix: the group service simply applies the location operation to each of the Group members in turn. In addition, the location services will not violate previous location operations. For example, a member of a group that has been previously fixed at a node will not be moved by a moveAU operation (it will be moved by a refixAU operation). It is still possible to move members of the group individually.  3.2  Interface  3.2.1  G r o u p Class  Group is a type with the following interface:  37  Creation: operation create —> [Group] Create an empty group object.  3.2.2  Group Object  An object whose type is Group has the following interface:  Management: operation addMember [o : Any] —>• [r : Boolean] Adds o to this group. An object may be in at most one group. Returns true on success, false on failure. The operation will fail if the object is already a member of another group, if the object has type Group, or if this group has failed. (Note that immutable objects are never added to groups as immutable objects are never unavailable. This is transparent to the programmer: the addMember operation will return true if the group has not failed.) operation removeMember [o : Any] —> [r : Boolean] Removes o from this group. Returns true on success, false on failure. The operation will fail if the object is not a member of this group, or if the group has failed. operation UstMembers —> [I : Vector, of [Any]] Returns a vector listing all current members in this group. Returns nil if this group has failed. operation UstMemberNodes —> [7 : Vector, of [Node]] Returns a vector listing all current nodes that have members of this group. Returns nil if this group has failed. 38  operation failed -¥ [r : Boolean] Returns true if the group has failed, false otherwise.  Notification: operation addGListener [I : GListener] —> [r : Boolean] Adds / as a listener to be notified upon failure of this group, using a callback as defined by the GListener type. Returns true on success, false on failure. The operation will fail if this group has failed. operation removeGListener [I : GListener] —> [r : Boolean] Removes Z as a listener for this group. Returns true on success, false on failure. The operation will fail if / is not a listener for this group, or if this group has failed. operation UstGListeners —> [/ : Vector, of [GListener]] Returns a vector listing all current listeners for this group. Returns nil if this group has failed. GListener represents the type of objects that are informed via a callback when a group becomes unavailable. Such objects must conform to the type GListener, which is: immutable typeobject GListener operation group Unavailable end GListener operation addNListener [I : NListener] —> [r : Boolean] Adds / as a listener to be notified upon the start or failure of a node, using a callback as defined by the NListener type. Returns true on success, false on failure. The operation will fail if this group has failed. 39  operation removeNListener [I : NListener] —> [r : Boolean] Removes I as a node listener for this group. Returns true on success, false on failure. The operation will fail if / is not a node listener for this group, or if this group has failed. operation HstNListeners —> [I : Vector, of [NListener]] Returns a vector listing all current node listeners for this group. Returns nil if this group has failed. NListener represents the type of objects that are informed via a callback upon the start or failure of a node. Such objects must conform to the type NListener, which is: immutable typeobject NListener operation nodeUpDown[n : Node,up : Boolean] end NListener operation UstListenerNodes —> [I : Vector, of [Node]] Returns a vector listing all current nodes where listeners may be notified. By default, this is all nodes that have had or currently do have group members, or that have invoked the Group object. Returns nil if this group has failed. operation addListenerNode [n : Node] —> [r : Boolean] Adds n as a node where listeners may be notified. Returns true on success, false on failure. The operation will fail if this group has failed.  Location: operation moveAll [n : Node] —> [r : Boolean] Applies the move to n operation to all members of this group. Returns true on success, false on failure. This operation will fail if the group has failed, or 40  if some of the members of the group cannot be moved. (For example, if one of the members is fixed at another location. In this case, as many members as possible will be moved.) o p e r a t i o n fixAll [n : Node] —> [r : Boolean] Applies the fix at n operation to all members of this group. Returns true on success, false on failure. This operation will fail if the group has failed, or if some of the members of the group cannot be fixed. (For example, if one of the members is fixed at another location. In this case, as many members as possible will be fixed.) o p e r a t i o n refixAll [n : Node] —» [r : Boolean] Applies the refix at n operation to all members of this group. Returns true on success, false on failure. This operation will fail if the group has failed, or if some of the members of the group cannot be refixed. (For example, if one of the members is not fixed. In this case, as many members as possible will be refixed.) o p e r a t i o n unfixAll —> [r : Boolean] Applies the unfix n operation to all members of this group. Returns true on success, false on failure. This operation will fail if the group has failed.  3.3  Implementation  The application's interface to the group service is through the immutable group object given above in Section 3.2. The group service is composed of two primary objects: the group manager (GManager) and the group representative (GRep). A GManager exists on each Emerald node where the group service is in use, and is responsible for creating, locating, and notifying all GReps that exist on a node. A 41  GRep exists on each node for each group active on that node, and is responsible for maintaining group state information and performing operations on that group. Each group is uniquely identified by a group identifier (gid). Several other objects are used, including a GLocks object providing locking services, and a GElection object providing for the election of a coordinator. Most of the group service is implemented in Emerald, with a small amount of changes to the Emerald runtime implemented in C. The changes to the Emerald runtime are: the addition of Group as a new builtin type; initialization of the group manager on node creation; upcalls to the group manager on node failures; upcalls to the group manager on moves of group members; a new call that allows the group object to locate the local GManager; and a new call that allows an Emerald program to mark an object as unavailable. NODE A  NODE B  /'/ V  ,' /  GRep \ gid \Jj  ff  GRep V  \.  gid 2 / \  [ GManager ]  /  [  t  NODE C /  ''/  GRep  \  NO 3ED  /  * \ E i ( M /  ""r  (  —h  GRep \ \ ,' f  GRep \ |  1 / /  gid 2 / /  - ^ - -'  \\  /""--._-'  1 GManager 1  I GManager I  vJ^  VJ/  Figure 3.1: Group Service Figure 3.1 shows an example of the group service with two active groups identified by gids 1 and 2. Gid 1 is active on nodes B, C and D. Gid 2 is active on 42  nodes B and D. No group is active on node A. A challenge in implementing the group service is for the group service to remain internally consistent in spite of failures in a multi-threaded environment.  3.3.1  Group Object  The immutable group object is returned from a Group.create operation.  The  object is created as immutable to guarantee that it is always available. The application may freely move references to the group object between nodes; because it is immutable, copies of the object are created on each node where it is referenced. Upon creation of the group object, the initially section attempts to find the local GManager for this node. If one is not found, one is created. Next, the GManager is invoked to create a GRep for this group on the local node. This GRep will exist on the node until the group fails. Finally, the initially section queries the newly created GRep for the gid and stores this in the group object. The group object is the application's interface to the group service. Upon invocation, the interface locates the appropriate GRep for the group and dispatches the call. As the group object may be invoked on different nodes, the group object must search for the appropriate GRep upon each invocation. When searching a node for a GRep, the group object first finds the GManager, if any, for that node. If a GManager exists on that node, the GManager is queried for the GRep by specifying the gid. If a GManager is not found, one is created. If a GRep is not found on the specified node, all nodes with active GManagers are searched. If no GRep is found on any of these nodes, the group is assumed to have failed. If a GRep is found, a GRep is created on the specified node, passing a reference to the found GRep.  43  For all operations, either a read or a write lock is acquired for the gid at the beginning of the call. The node where the invocation was performed is searched for a GRep for this gid. If one is found, the Group object invokes it to acquire the appropriate lock as described in Section 3.3.3. All operations except unfixAll, failed, and the various list operations update distributed GRep state and therefore require write locks. For example, the removeMember operation notifies all other GReps of the removed member and whether the local GRep has members. The unfixAll, failed, and the various list operations simply read local GRep state and therefore use read locks. For example, the listMember operation returns a list of members from the local GRep. For the addMember and removeMember operations, after acquiring the lock the node where the group member is located is searched for a GRep for this gid, and the call is dispatched to this GRep. For all other operations, after acquiring the lock the call is dispatched to the GRep on the node where the invocation was performed. In order to improve performance on call dispatch, the group object caches a hint for the last known location of both an initialized GManager and a GRep for this group. Both hints tend towards the node where the group object is currently located. By caching these references, global searches can be avoided on most invocations, occurring only when the group object is invoked on some node where it has not been invoked before.  44  3.3.2  G M a n a g e r Object  The GManager is responsible for creating, locating, and notifying all GReps that exist on a node. On node creation, the group service is initialized with a small monitored object that synchronizes startup access to the GManager. The monitored object can be queried for the GManager, with an option to create a GManager. Once the GManager is created, access to each GManager is synchronized through local locks using the GLocks object (see Section 3.3.4). Each GManager has a reference to all other GManagers that exist on the current nodes with group service activity. Each GManager also maintains a current coordinator GManager. The coordinator is responsible for generating unique gids when creating GReps. The coordinator is maintained using a GElection object (see Section 3.3.4). On creation, the GManager searches all nodes attempting to find an existing GManager. If one is found, the new GManager queries the found GManager for the current coordinator. Otherwise, the new GManager assumes that it is the coordinator. To improve performance on startup, the GManager create operation can be passed a currently existing GManager as a hint. This hint is tried first, and if it is unavailable only then is a global search performed. On the creation of a new GRep, the current GManager queries the coordinator for a new gid and creates a local GRep. If this is a local representative for an already existing group, the GManager creates a GRep, passing it a reference to an already existing GRep for the group. A reference to the new GRep is stored to perform future queries. The reference to the new GRep is removed if the group fails. The GManager is notified of any node failures through an Emerald runtime upcall. The Emerald runtime detects failures of other nodes through T C P / I P con45  nection failure. On a failure, the GManager first notifies the GElection object, if necessary choosing and initializing a new coordinator. The GManager then informs each GRep of the node failure. Each GManager also maintains a hash table mapping local group members to GReps. The GReps are responsible for the addition and deletion of members in the hash table. The Emerald runtime detects the move of a group member object by checking a bit in the object's flags upon moves. If a group member is detected, the Emerald runtime performs an upcall to the GManager with the object being moved, the destination of the move, and the type of move (move, fix, or refix). The GManager uses the hash table to determine the GRep affected by the move, and dispatches the move to the GRep. Details of moving a group member, including handling failures during moves, are discussed in the Section 3.3.3.  3.3.3  GRep Object  A GRep exists on each node for each group active on that node, with each group uniquely identified by a gid. As shown in Figure 3.1, the set of GReps for each group can be logically viewed as a single service supporting that group. The GRep is the most complex object in the group service, enforcing crash failure semantics for the group on node failures, maintaining group state of the group, and implementing the Group object interface. The group state each GRep maintains includes all members of this group, the number of members of this group on this node, all group listeners for this group, all node listeners for this group, all other GReps for this group, and whether the other GReps currently have local members. Each group also maintains a current coordinator GRep, maintained using a  46  GElection object (see Section 3.3.4). The coordinator synchronizes access to the gid, and is responsible for notifying all GReps when the group is deemed to have failed. Dealing with the failure of the coordinator is discussed below. On creation, a GRep is passed a reference to another GRep. If this reference is nil, the GRep is the first GRep for a new group. Otherwise, the GRep is an additional GRep for an already existing group. In this case, the GRep contacts the coordinator for the group and initializes group state from the coordinator. If the coordinator fails while the GRep is initializing, the initialization for the GRep fails. When a member is added to the group using the addMember operation, the GRep first performs a test and set of a flag in the header of the new member. This flag will only be set if the object is already a group member. If the object is not already a group member, the flag is set and the GRep informs all GReps for this gid of the new member, updating state information as necessary. Recall that the appropriate lock is acquired and freed by the Group object. The coordinator uses the GLocks object (see Section 3.3.4) to synchronize access to the group. Before performing any read or write operation on the group, the Group object acquires a read or write lock from the coordinator. The use of a distributed lock adds two additional considerations. First, the locking service must correctly handle the failure of a lock requester before the requester has acquired the lock. This is handled using the ping capability of the GLock object: when a lock is about to be granted to a lock requester, the requester is first pinged to ensure the requester is still available. Secondly, the locking service must correctly handle the failure of a lock holder. This leads to the more general issue of failure handling in the GRep. Each GRep is informed of a node failure by the local GManager.  47  Upon  receiving a failure notification, a GRep informs the GElection object. The GElection object determines if there is a GRep on the node that has failed. If there is, the GElection elects a new coordinator (if necessary) and returns the identity of the failed node. The coordinator uses the failed node to determine if the failed GRep had local members. If it did, the group has failed. The coordinator ensures that any locks held by the failed node are freed, and acquires a write lock for the gid. This write lock is given priority over all other invocations currently waiting for read or write locks by using the GLocks steal mechanism. If the group was not already concluded to have failed, the coordinator next performs a state consistency check among all surviving GReps. This checks that all GReps agree on the number of members, listeners, and nodes currently in the group, and that all local members of the group are correctly marked as such. If these values are inconsistent, the group has failed. If the coordinator concludes the group has; failed, the coordinator informs all GReps to make all members of the group unavailable, notifies all group listeners of the failure of the group, and informs all GReps to remove their local GManager reference to the GRep. The coordinator is also responsible for notifying node listeners on the start or failure of a node. The notification on the failure of a node occurs after all other group failure handling has occurred. Upon the move of a group member using Emerald keywords such as move or attach, the GManager informs the appropriate GRep for the member as described above. The GRep is responsible for acquiring a write lock for this gid and updating the state of both the source and destination GReps to correctly reflect the current location of the group member. Failures during the move are handled by setting both  48  the source and destination GReps to include the member until the move successfully completes. The failure of the source or destination during the move will result in the group failing. The moveAll, fixAll, and refixAll operations share the same set of invocations with a difference only in the actual move, fix or refix operation. The same operations are also used to implement the moves of individual members, and the same technique of setting both the source and destination GReps to include members until the move completes is used to handle failures during moves. To reduce message transmission, the addMember and removeMember operations piggyback whether the GRep has members with the new or removed member notification. Other notifications indicating whether a GRep has members occur only when the node changes from having no members to having some or from having some to having none. All operations ignore failures during invocations of remote GReps, and perform the operation on as many GReps as possible. Any node failure is handled as described above.  3.3.4  Consistency and Data Structure Objects  Many other objects support the group service. The GLocks and GElection objects provide consistency services. Data structures are provided by the GList, GArray, and GHashTable objects. The GLocks object supports a multiple-readers single-writer style lock. For handling failures of lock requesters, the GLocks object provides a ping mechanism. When a lock is about to be granted to a lock requester, the requester is first pinged to ensure the requester is still available. If the ping is successful, the lock is granted.  49  .. Otherwise, the requester is deemed to have failed and the next lock request is serviced. The GLocks object also supports a write lock steal mechanism. This allows one write lock request to move to the front of the write lock queue. This is used by the group service to give failure notifications priority, while still maintaining the correct use of locks. The GElection object supports elections among a group of nodes, with one GElection object per node. In order to operate correctly each GElection object must be notified upon node failure, and a new GElection object joining a group of GElection objects must be given a reference to an existing GElection object in the group. Each GElection object contains a unique identifier and knows the identifier of all other GElection objects. Upon a failure of the coordinator, each GElection object independently chooses the highest identifier as the new coordinator. The GList and GArray objects use parametric polymorphism to provide generic implementations of lists and arrays. The GList object uses type parameters to indicate the type of the list elements and the comparison type used to compare nodes (which is usually a specific object in the list element). The GArray object uses a type parameter to indicate the type of the array elements. Both the GList and the GArray provide "safe" add and remove operations: it is an error to add an element that is already in the list or array, and it is an error to remove an element that is not in the list or array. The GList uses a singly linked list, while the GArray uses the builtin array type. The GHashTable provides a hash table mapping type any to type any. The hash table is implemented as a simple linear probed hash function, increasing in size when the hash table is over 80% full to avoid clustering. The GHashTable, like the GList and GArray objects, provides safe insert and remove operations.  50  3.4  Performance  Several tests evaluate the performance of the group service on fundamental operations and on the scaling of these operations to larger groups located on more nodes. For the purposes of these tests, a larger group is considered to be one with more members. These tests are generally run on from one to ten Emerald nodes, with the number of members ranging from 1 to 100. The tests are performed on five PCs running FreeBSD 2.2.x with 200 MHz Pentium processors, 64 MB memory, and at least a 10 Mbps Ethernet connection.  •S600  •1 Member •5 Members •10 Members •25 Members •50 Members •100 Members  § 500 w 400  Figure 3.2: New GRep The results in Figure 3.2 show the time it takes in milliseconds to create a new GRep for a group on n nodes when a GRep for the group already exists on n — 1 nodes with from 1 to 100 members. For example, to create a GRep on a third node for a group with 25 members on two nodes takes approximately 300 milliseconds. Not shown on the graph is the 2 milliseconds it takes to create a new GRep on a single node. The smaller values for 10 nodes are an anomaly caused by reduced traffic on the network. 51  These results are as expected. The increase in time as the number of nodes increase is caused by the condition that each GRep is informed of every other GRep, and therefore all GReps must be contacted on the creation of a new GRep. The increase in time as the number of members increase is caused by each GRep knowing the entire group state. More information must be passed to the new GRep on creation as the number of members increase.  Nodes  Figure 3.3: Add Member The results in Figure 3.3 show the time it takes in milliseconds to add a member to an empty group where the group exists on from 1 to 10 nodes. For example, to add a new member to an empty group on 6 nodes takes approximately 20 milliseconds. This is caused, as discussed above, by each GRep knowing all group state, and therefore the overhead is increasing as the number of nodes increases. The results in Figure 3.4 show the time it takes in milliseconds to add a member to an existing group for a group on from 1 to 10 nodes with 1 to 100 current members. For example, to add a member to a group with 100 members on 4 nodes takes approximately 25 milliseconds.  52  60(0 TJ  50- * - l Member —•—5 Members - * - 1 0 Members —*—25 Members -*— 50 Members 100 Members  c 40o o V (0 3 0 20-  ^ * ^  H—**^^^  M""""^^  , ^ , i — T ^  100J ^ ^ 7  i  1  1  1  1  1  1  5 6 Nodes  10  Figure 3.4: Add Member Scaling The increases in time with both nodes and members are caused partially by the same reasons as discussed above. In addition, the use of a "safe" linked list to store the members causes additional overhead. Before every insert the list is searched for the item about to be inserted to ensure that it is not inserted twice. The overhead caused by this search increases as the number of members (the length of the list) increases. 60  —«— Group Member - • - Non-Group Member  Figure 3.5: Move One 53  Figure 3.6: Move All Figure 3.5 and Figure 3.6 show the overhead imposed on moves of group members by the group service. Figure 3.5 shows the time it takes in milliseconds to move a member of a group using the Emerald move keyword where the group exists on from 1 to 10 nodes. Also shown for comparison is the time it takes to move an object that is not a member of a group. For example, to move a group member on 9 nodes it takes approximately 50 milliseconds; to move a non-group member it takes approximately 1 millisecond. Figure 3.6 shows the time it takes in milliseconds to move all members of a group using the group service moveall operation where the group exists on from 1 to 10 nodes and has from 1 to 100 members. For example, to move all members of a group with 25 members on 2 nodes takes approximately 100 milliseconds. The increases in time with both nodes and members are caused by the same reasons as discussed above. Additional overhead is caused in the case of using the Emerald move keyword by the switches between Emerald and C code. The results in Figure 3.7 show the number of messages transmitted to and  54  1000-800--  ilFailure-Sent • Failure - Received El No Failure - Sent UNo Failure - Received • Normal - Sent H Normal - Received  Figure 3.7: Failure Message Activity from the node executing the following program. This program creates a group and ensures that every active node contains one member of the group. const failact <- object failact process const nl: ImmutableVector.of[NodeListElement] <- (locate self).getActiveNodes const g: Group <- Group.create  var i : I n t e g e r for ( i <- 0 : i <= nl.upperBound : i <- i + 1 ) const mObj <- mObjClass.create[i] move mObj t o nl.getElement[i].getTheNode const b <- g.addmember[mObj] end for end process end f a i l a c t The number of messages sent and received is given in both the case of no failures and the case of a group failure, on from 2 to 10 nodes. For comparison the number of messages sent and received in the case of no executing program is also shown. For example, for a group on 9 nodes in the case of a group failure approximately 800 messages are sent and received. In the normal case approximately 55  20 messages are sent and received. Only slightly more messages are transmitted in the case of a group failure than the case of no group failure. This shows that the bulk of the overhead of the group service occurs in the case of normal operation.  1000 -800--  HOne Member - Sent HOne Member- Received • Two Members - Sent 01 Two Members - Received  CM  co  •*  LOCD  r~-oo  a>  o  Nodes  Figure 3.8: Regular Message Activity The results in Figure 3.8 show the number of messages transmitted to and from the node executing the above program, and the additional message overhead when a second member is added to the group at every node (achieved by repeating the for loop in the above program). Only slightly more messages are transmitted to add the second member. This shows most messages in this case are from the initial set up of the GManagers and the GReps. The results in this section show that substantial room exists for performance improvements of the group service, particularly with respect to the scaling of the service. The two primary areas for improving performance are the replication of group state on every GRep, and the use of the linked list as a fundamental data structure. By maintaining group state on some fixed number of GReps, fc-way fault  56  tolerance can be provided with a substantial reduction in overhead, providing much better scaling of the group service as the number of nodes increase. Replacing the use of a linked link with, for example, a hash table would provide much better scaling of the group service as the number of members increase. Even if the linked list is maintained, the use of the "safe" operations for the linked list should only be used in a debug version.  3.5  Limitations  Although the group service correctly handles network partitions when group members are split between partitions, the behaviour of the group service when a group listener is partitioned from the members of the group is not as clean. Ideally, a group listener is not notified unless the group fails. With a partition as described, a group listener could be notified that a group has failed when in fact the group is still active in another partition. A possible solution is to fail the group whenever a group listener node fails. This would avoid the situation described, but result in a two way dependency between the group and the listener. Emerald is a garbage collected language, and as such objects are collected when they are not referenced. In order to maintain the standard Emerald semantics, the group service does not have a close or free operation for a group. This means that the GRep for a group will never be deleted unless it has failed. The GRep is a relatively small object, although for a long running system with many groups this could result in a large amount of wasted memory. In addition, the GRep maintains references to the members and listeners of the group, and this will result in the members and listeners of the group also never being garbage collected. A solution is for the user to ensure all members and listeners are removed from a group when 57  it is no longer required. A better solution is for the garbage collector to collect the GReps when there are no more immutable group objects for that GRep (meaning no reference to the GRep outside of the group service will occur). However this requires a tight coupling between the garbage collector and the group service. The current implementation of the group service will correctly notice and synchronously handle the move of an Emerald object. However, the move of an object that has an attached group member results in the group service asynchronously moving the group member. This slightly changes the semantics of fix and refix as the object will not in fact have been fixed or refixed when the call to fix or refix returns. More importantly, repeated moves of a group member via attach will result in an error in the group service. The current Emerald implementation provides no mechanism for a process to be blocked on an upcall. In order to correctly handle moves of group members via attach, the group service requires a blocking upcall mechanism from Emerald. Using this, the process requesting the move would not be resumed until the group service has properly handled the moves of the attached group member(s). As implemented, the group service can only handle a single failure at a time. Multiple failures will possibly result in a failure in the group service due to race conditions. This can be corrected by a more careful implementation of the node down handling code. The extra complexity required for this was not felt to add any value to the implementation because of the rarity of concurrent failures. The group service currently assumes failures occur on the granularity of nodes. It should be relatively simple to extend the group service to notice the failures of objects on active nodes. This situation is typically the result of programming error and may result in extra overhead in the Emerald runtime.  58  Chapter 4  Examples 4.1  Introduction  This chapter gives examples of using the group service to create fault tolerant distributed applications in Emerald. The first example is a simple token passing application, demonstrating how to use the group service. The second example is a reliable name server. The name server provides operations to bind, lookup, and unbind names to values. The third example is a distributed system monitor providing graphical information on currently available nodes, load on nodes, bandwidth and latency between nodes, and distributed garbage collection.  4.2  Token Passer  This example is a simple token passing application, demonstrating how to use the group service. The code below shows the important sections of a non fault-tolerant version of the token passer. In total, this program contains 229 lines of Emerald  59  code (the bulk of these being the GList data structure). This program creates an object on every active node, and then moves itself as a token between the nodes, invoking the object on each node it visits. This could be used as the basis for a shared resource management program. Note that this program is not fault tolerant: any failed node means the program will fail. Enhancing this program to be fault tolerant without using the group service would require unavailable handlers on the invocation of the list object, the list nodes, and the objects referenced from the list nodes. Other possible requirements would be timers to ensure the token is not lost, and a coordinator to ensure that multiple tokens are not introduced. In addition, the program is not informed of any new nodes that become active after it is started. const main <- object main initially const t <- tokenPasser.create end initially end main const tokenPasser <- class tokenPasser process const n: Node <- locate self const nl: ImmutableVector.of[NodeListElement] <- n.getActiveNodes const mObjList: GList.of[mObjNode, Node] <- GList.of[mObjNode, Node].create var i: Integer for ( i <- 0 : i <= nl.upperBound : i <- i + 1 ) const mObj: mObjClass <- mObjClass.create[i] const mon: mObjNode <- mObjNode.create [mObj, nl.getElement[i].getTheNode] mObj List.add[mon] fix mObj at nl.getElement[i].getTheNode end for const mol: Vector.of[mObjNode] <- mObjList.list const delayTime: Time <- Time.created, 0]  60  loop for ( i <- 0 : i <= mol.upperBound : i <- i + 1 ) move self to mol.getElement[i].getNode mol.getElement[i] (locate self).delay[delayTime] end for end loop end process end tokenPasser const mObjClass <- class mObjClass [id: Integer] export operation run (locate self).getStdOut.putString ["Greetings from " || id.asString || "\n"] end run end mObjClass  The code below shows the same token passing application, but now enhanced for fault tolerance using the group service. In total, this program contains 240 lines of Emerald code. The additional 11 lines of code are marked with a '/,'/,. These lines create a group, add the objects in the token passer to the group, create a group listener object (tokenPasserL), and add this object as a group listener. Upon the failure of any node, the group service fails the current instance of the token passing application, and the group listener starts a new instance. In addition, the program could easily be enhanced to be informed of newly active nodes using the node listener portion of the group service. const main <- object main initially const t <- tokenPasser.create end initially end main const tokenPasser <- class tokenPasser const g: Group <- Group.create const tokenPasserL <- immutable object tokenPasserL export operation groupUnavailable  61  n n u  const t <- tokenPasser.create end groupUnavailable end tokenPasserL process const n: Node <- locate self const nl: ImmutableVector.of [NodeListElement] <- n.getActiveNodes const mObjList: GList.of[mObjNode, Node] <- GList.of[mObjNode, Node].create var i: Integer  n n  var b: Boolean <- g.addMember[self] b <- g.addMember[mObjList] for ( i <- 0 : i <= nl.upperBound : i <- i + 1 ) const mObj: mObjClass <- mObjClass.create[i] b <- g.addMember[mObj] const mon: mObjNode <- mObjNode.create [mObj, nl.getElement[i].getTheNode] b <- g.addMember[mon] mObjList.add[mon] fix mObj at nl.getElement[i].getTheNode end for  n n  b <- g.addGListener[tokenPasserL] const mol: Vector.of[mObjNode] <- mObjList.list const delayTime: Time <- Time.create[1, 0] loop for ( i <- 0 : i <= mol.upperBound : i <- i + 1 ) move self to mol.getElement[i].getNode mol.getElement[i] (locate self).delay[delayTime] end for end loop end process end tokenPasser  '/„'/„  const mObjClass <- class mObjClass [id: Integer] export operation run (locate self).getStdOut.putString ["Greetings from " I I id.asString I I "\n"] end run end mObjClass  62  °a  u  4.3  Name Server  This example is a reliable name server. The name server provides operations to bind, lookup, and unbind names to values. The interface to the name server is given by an immutable object with these three operations. Names are given as Emerald strings, and values are any type of Emerald object. The name server is replicated on two nodes using a primary/backup model of replication. Upon the failure of one of the replica nodes, a new replica is created and moved to a functional node. New nodes joining the distributed environment can use the services of an already existing name server. The fault tolerance portion of the name server is implemented using the group service, primarily for notification and location. Using only one backup means the name server will fail if the nodes containing the primary replica and the the backup replica simultaneously fail. The name server does not use permanent storage and therefore will also fail if there is only one node in the distributed system and it fails.  4.3.1  Interface  NameServer has the following interface: class export operation init —> [ns : NameServer] Joins an existing name server, if one exists on an active node, or creates a new one. Must be invoked on any node where the name server is used. Returns an object of type NameServer that is invoked to perform bind, lookup, and unbind operations. operation bind [n : String, v : Any] —>• [r : Boolean]  63  Binds value v to name n. Returns true on success, false on failure. operation lookup [n : String] —)• [v : Any] Looks up name n and returns the associated value v, or nil if there is no associated value. operation unbind [n : String] —> [r : Boolean] Unbinds value from name n. Returns true on success, false on failure.  4.3.2  Implementation  The core of the name service is written in approximately 400 lines of Emerald code, excluding comments and blank lines. Using the group service provides a clean separation between failure handling code and regular operation, with approximately 50 lines of Emerald code providing failure handling. The interface to the name service is given as an immutable object.  This  shields users of the service from failures within the service, taking advantage of the property that Emerald immutable objects are never unavailable. On each bind, lookup, or unbind operation, the immutable object attempts to perform the operation on the primary replica. If that fails, the immutable object attempts to perform the object on the backup replica. If that also fails, the name server is considered to have failed. In order to locate the current primary and secondary replicas, the name server uses the Gaggle mechanism [32]. Both the primary and backup replicas create groups and place all of their objects into these groups. The primary becomes a listener for the backup group, and the backup becomes a listener for the primary group. In this (simplified) code fragment from the name server, the primary creates the backup, adds a listener object to the backup group, ensures listener nodes include  64  the node where the other replica resides, and moves the primary and backup to separate nodes. The creation operation for the backup includes the addition of a listener for the primary group. export o p e r a t i o n buildOther const a l l <- ( l o c a t e self).getActiveNodes 7, c r e a t e backup other <- nameServerData.create[gagg, f a l s e , s e l f , thisGroup] otherGroup <- other.GetGroup '/, add l i s t e n e r t o backup group otherGroup.addGListener [nameServerListener.create [ s e l f , gagg, ( l o c a t e self).getName]] °/0 ensure l i s t e n e r nodes include o t h e r node thisGroup.addListenerNode[all[1] $theNode] otherGroup.addListenerNode[all[0] $theNode] 7, move t o d i f f e r e n t nodes thisGroup.fixAll[all[0]$theNode] otherGroup.fixAll[all[1]$theNode] end buildOther The group service informs the primary on failure of the backup, and the backup on failure of the primary. Upon failure, the surviving replica becomes the primary replica and creates a new backup. In this code nsd refers to the name server that created this listener. Notice that if nsd is unavailable, both replicas must have failed simultaneously. export o p e r a t i o n groupUnavailable begin '/, i f not primary, become primary if Insd.getPrimary then nsd.setPrimary[true] end i f 7. b u i l d o t h e r nsd.buildOther 65  failure stdout.putString["Both replicas have failed\n"] end failure end end GroupUnavailable  4.4  Distributed Monitor  The Distributed Monitor is an X Windows application providing graphical information about the distributed Emerald environment (Figure 4.1), The four components of the Distributed Monitor are: the Node Status Monitor, the CPU Utilization Monitor, the Link Load Monitor, and the Garbage Collection Monitor. The Node Status Monitor provides a list of all active and failed nodes. The CPU Utilization Monitor shows the CPU activity for each node. The Link Load Monitor provides latency and bandwidth information for links between nodes. The Garbage Collection Monitor provides information on garbage collection at each node. 3 |  Emerald Distributed Monitor  | 11 J  Nod* Stilus Monitor CPU UMraiDon Monitor  Ifr* toad Morttor Garbage Cofecttart Monitor 0*  Figure 4.1: Distributed Monitor Each of the four monitors is logically split into two components: a view and an application. The view is responsible for displaying the data and responding to  66  user input using X Windows. The application is responsible for creating the data and passing it to the view. The monitors use the group service to make the Distributed Monitor fault tolerant. Uses of the group service that are common to all the monitors are described here, while techniques specific to each monitor are described below. Each monitor typically creates two groups: one for the view and one for the application. The application is a listener for the view group, and the view is a listener for the application group. Upon notification of the failure of the application, the view creates another instance of the application. Upon notification of the failure of the view, the application exits. The application portion of each monitor uses the node listener service to maintain the currently active nodes. This information is used to determine which nodes data should be collected from, and is passed to the view for display purposes.  4.4.1  Node Status Monitor  Figure 4.2 shows the Node Status Monitor. Node 17586 is active, and node 17586 has failed. Nodes are uniquely identified by a name and port number pair. When mako.cs . u b c . c a : 17586 becomes active again it will be placed in the live nodes column. A new instance of the Node Status Monitor will show all active hosts, and any hosts that fail after the instance is created. The Node Status Monitor is the simplest of the four monitors, using only the node listener service and residing on one node with no mobility. This simple behaviour means that the view and the application fail together if the node they are on fails, and therefore no groups are required. The node listener service is used by  67  —j  1 J1 J  Node Status Monitor  Live Modes .  Deadf&Kfea  tte&m&&sfaml?$3& ? ^  mamwg«.ybC'ca.!7S^  rf^o«;17586  |  speb^ita c*>.«fac ca:! 7842 m^tt5£sabcx*:1:7642  :  •  _£LJ Figure 4.2: Node Status Monitor creating an empty group (a group with no members), and adding a node listener.  4.4.2  CPU Utilization Monitor  Figure 4.3 shows the CPU Utilization Monitor. Node manning.cs.ubc .ca: 17586 is busy 59% of the time, 17842 is busy 15% of the time. The utilization for each CPU is updated at a regular interval, currently set at one second. Any failed host is removed from the monitor, and any newly active node is added. The application is composed of a base, residing on the same node as the view, and CPU monitors, one residing on each node. Groups are created for the base and for each of the CPU monitors. The base creates listeners for each of the CPU monitor groups, and the CPU monitors are listeners for the base group. Upon notification from the group service that the base has failed a CPU monitor exits. Upon notification from the group service that a CPU monitor has failed, the listener informs the view that a node has failed. Upon notification from the group service that a node is up, the base creates a new CPU monitor, and creates a listener for the CPU monitor group as shown in the following (simplified) code fragment.  68  CPU Utilization Monitor  34  Figure 4.3: CPU Utilization Monitor export o p e r a t i o n nodeUp[n: Node] const gcpum: gcpuMonitor <- gcpuMonitor.create[thisGroup, x, n] gcpuMonitors.add[gcpum] const name: S t r i n g <n.getName | | " : " | | (n.getLNN/OxlOOOO).asString const r : Boolean <gcpum.getGroup.addGListener[gcpuMListener.create[name]] end nodeUp Upon creation each CPU monitor creates a group, adds all objects it contains to the group, and fixes the group at the node passed as a constructor parameter. The CPU monitor then enters a loop measuring CPU activity. The measurement is performed through a call to C code and is platform dependent.  4.4.3  Link Load M o n i t o r  Figure 4.4 shows the Link Load Monitor. The latency and bandwidth information is displayed in a table format, the rows indicating the from nodes and the columns indicating the to nodes. For example, the latency from manning, cs .ubc. ca: 17586 69  to blackcomb. c s . ubc. ca: 17586 is 944 microseconds, with a bandwidth of 29 Mbps. On the diagonal of the table the to and from nodes are the same and the measurements show theoretical extremes.  3:  Ha  L i n k Load M o n i t o r  bWEficorab«,<*  m*o t s tlx. ca. 17S38  a«ckCom&.l».abC.c4:t75SB  RBnrtngjs .ubc .ca: 17586!7586;  48 us, 7281 Mfcps  919 us, 32 Mbps  1770 as. 7 Mbps  925 US, 32 Mbps  42l»,6721 Mbps  1895us,7Mbps  1890 us, 4 Mbps  ,ii.ielL5f.t»;csjiK.rai:l758S:  Quit  Figure 4.4: Link Load Monitor The two sliders are used to highlight links with unacceptably high latencies or unacceptably low bandwidths. Links with unacceptable latencies are highlighted in one colour (magenta), links with unacceptable bandwidths are highlighted in another colour (blue), and links with both unacceptable are highlighted in a third colour (red). The application component of the Link Load Monitor is structured as a single group roaming between nodes. The group first moves all members to a single node, and then moves a single group member that performs the measurement to each of the other nodes in turn. The group service ensures that if a node containing part of the application fails, the rest of the application also fails. In this case the view will recreate the application as described above.  70  4.4.4  Garbage Collection Monitor  Figure 4.5 shows the Garbage Collection Monitor. The Garbage Collection Monitor is intended primarily as an empirical tool to help understand the characteristics of distributed garbage (garbage that cannot be collected through a local collection). The garbage collection monitor measures the percentage of the old generation space that is free, the number of objects that could be remotely referenced on each node (objects with OIDs), and the number of references this node holds for objects on other nodes (stubs). For example, 17586 has 62% of the old generation space available, 4622 objects on the node could be remotely referenced, and 103 stubs exist for objects on other nodes. n]x  - 1 Garbage Collection Monitor '  «nnir»j.<».uh<;.ci»:! 7586  OWectaWthOIOs  1  .: btactaombc  621  7B%  4822  BJKP98  mmmm^  i _ _  : : \  :  |  78S  7789  2 1 . ' .  '  .  •  •  .  .  Figure 4.5: Garbage Collection Monitor The application component of the Garbage Collection Monitor is structured as a single group roaming between nodes, measuring garbage collection information for each node when the application is resident on that node.  71  Chapter 5  Related Work SOS [57] [58] is an object oriented distributed operating system that provides object persistence and migration, built using C + + with extensions. A typical SOS object is on the order of 100 or more bytes, with an object being some arbitrary collection of data with code attached. Distributed services are built using the concept of a fragmented object. In order to use a distributed service, a client must acquire a local proxy of the fragmented object. Migration is used to acquire a proxy for a fragmented object. In order to maintain consistency within groups of objects in spite of unexpected events such as failures, SOS uses the concept of dependencies. This allows certain objects to be declared dependent on other objects, allowing the dependent object to be informed of significant changes in the state of the depended on object. These significant changes are both system detectable and user defined events. This is similar to the listener mechanism in the group service, although it is unclear if the dependency mechanism was ever implemented. COOL [2] is the Chorus Object Oriented Layer, an object oriented environment build on top of the Chorus Micro kernel supporting object migration and  72  persistence. It is meant to address the mismatch between the services and abstractions offered by the operating system and the services and abstractions offered by the language environment, giving programmers single address space semantics in a distributed environment. COOL is composed of three layers: base, generic run time, and language specific run time. The base provides an image of a single micro kernel build over several distributed Chorus micro kernels, and provides memory services, message passing, distributed shared memory, and persistent storage. The base supports three kinds of migration: migration inside a context group (a group of related memory regions), between context groups, and from disk into a context. The generic run time implements the COOL notion of objects: a combination of state and code. COOL objects are grouped in modules which are application defined clusters of related objects. A cluster is the basic unit of distribution, and provides an abstraction of distributed shared virtual memory. COOL uses the notion of activities for a single thread of control while a job is a collection of (possibly) distributed and related activities. A job is usually a single distributed application. COOL uses up calls into the language specific run time in order to specialize the generic run time. Up calls are used for pointer swizzling upon migration and also for invoking proxies on RPC-like calls. The language specific run time is essentially a pre-processor that generates the appropriate calls to the generic run time. For example, COOL++ is an extended version of C+-1- that is pre-processed into standard C + + . Programmers must define objects using an Interface Definition Language in order to support persistence and migration. COOL is not built to tolerate failures. For example, if an object is mapped in memory at one machine and physically stored at another, the failure of the machine storing the object will cause unpredictable behaviour. BirliX [45] provides an object migration mechanism that is adaptable by  73  separating policy and mechanism. In BirliX, objects have both functional and nonfunctional properties, where functional properties are ones defined by the type of the object, and non-functional properties are the object's infrastructure, the run time and inherited primary type. Each instance exists in an object management system. Migration of an instance involves the suspension of the instance, the generation of a checkpoint on secondary store, the transfer of the checkpoint to the destination object management system, the regeneration of the state from the checkpoint, and the resumption of activities. BirliX instances are persistent as long at least one reference to them or at least one thread in them exists. BirliX instances are implemented by a structure called a team, essentially an instance and the required storage and computing resources (such as segments) for that instance. Migration of an object is the migration of the team, making object migration in BirliX a relatively heavyweight mechanism. Each object has a default migration mechanism inherited from the primary type. This can be changed by attaching a meta object specifying a different migration mechanism. Type specific migration mechanisms can also be applied. An example is given of a meta implementation of migration that provides for more reliable object migration, at the cost of extra overhead. Applications can be made fault-tolerant in BirliX by checkpointing the state of an instance in persistent store. This is only feasible in situations where the frequency and number of checkpoints does not produce unacceptable run-time overhead. A system with fine-grained object mobility would result in many checkpoints, possibly occurring with great frequency. Shadows is a distributed object system influenced by the work of Arjuna. The first version of Shadows [11] takes the approach of providing a minimal amount of services for a distributed object system, with other services being built on top  74  of these. The basic services are object servers, location transparent invocation, and object migration. Object servers provide clients access to objects running on that node, with each server managing multiple objects of a single class. Location transparent invocation is achieved by using a local version of the object known as a shadow that essentially acts as a stub or proxy. Object migration is achieved by object servers having the functionality to send and receive object state from other object servers or clients. Only object state is migrated and objects must provide the methods to marshal and unmarshal their state. In addition, only quiescent objects may be migrated. These basic properties are given as a set of C + + classes and used to create Shareable, Durable, Recoverable, and Lockable classes, which may be flexibly used in any combination through multiple inheritance. The first version of Shadows has been implemented on a network or transputers running the Helios operating system. The second version of Shadows [12] was implemented on top of a UNIX system and followed the same basic architecture, with some enhancements. The facilities provided in that case are for object naming, location, invocation, persistence, and garbage collection. In addition, node failures or network partitions are tolerated. In the second version of Shadows, the object server is replaced by the object manager, and the proxy by references. References use stub-scion pair (SSP) chains. Fault tolerance is added to the reference chains. Names are context dependent with a name defined by an object manager name combined with a local name. Multiple names are achieved by the use of multiple references acting as aliases. As with BirliX, objects can be made fault-tolerant through the use of checkpoints. FT-SR [55] provides fault-tolerance mechanisms in the SR distributed programming language. FT-SR attempts to provide a balance between a low level language with no support for fault-tolerance or distributed programming such as C  75  and a high level language with a very specific fault-tolerance and distributed programming model such as Argus. This is achieved by providing general mechanisms that can be used to build specific fault-tolerant policies. The mechanisms include replication, recovery, and failure notification. Failure notification can be either synchronous or asynchronous. Synchronous notification is synchronous with respect to a call. Asynchronous notification occurs when a resource is monitored and an operation invoked if the resource fails. FT-SR assumes fail-stop modules with fail-silent failure semantics. Fail-stop modules are modules with operations that either execute to completion or fail. Fail-silent semantics are failures that result in a complete cessation of execution. Fail-stop modules provide stronger failure semantics than group service groups in that groups may fail in the middle of an execution. To provide fail-stop modules with the group service a transaction layer using the group service would need to be introduced. The assumption of fail-silent semantics does not hold in distributed systems with fine-grained object mobility where portions of a server may fail, the environment the group service addresses. Synchronous notification is analogous to the Emerald unavailable mechanism. Asynchronous notification is analogous to the group service notification mechanism. The FT-SR mechanism is stylistically different in that FT-SR procedures to be notified can be called with various arguments and are nested in the code using an exception-like mechanism. The FT-SR notification mechanism is also tightly integrated with replication in that a resource that is replicated does not cause notification on failure of a replica unless it is the failure of the last replica. Obliq [10] is a language that uses distributed lexical (static) scoping to support distributed object-oriented programming. This ensures that computations can roam over the network while maintaining network connections. For example, when  76  a procedure is accepted for execution at a compute server, the variables in the procedure maintain the bindings determined by lexical scoping at the client. Moving a variable to another site results in a network reference.  Objects are the basic  structure in an Obliq program; there are neither classes nor inheritance. Object migration is achieved through a cloning of object state on the destination site and an alias to the clone on the source site. Obliq provides exceptions upon communication failures that can be trapped. Failures may mean a machine has crashed, or a process has terminated. No automatic recovery is provided. Rover [33] provides a toolkit to isolate mobile application developers from the limitations of mobile communication systems, supporting either client or communications failures. An extension to the Rover toolkit [34] provides support for server failures as well through stable logging of messages, persistent variables, failure recovery procedures, and detection and restart of failed servers. Failures are assumed to be transient and recoverable, such as a power failure or a bug in a rarely used code path. The Rover toolkit is intended for a client/server model of distributed application, while the group service operates in the more general environment of fully distributed applications with object mobility. Conceptually, the group service is intended for a lower layer of abstraction than the Rover toolkit. A toolkit like Rover that assumes crash failure semantics could be built on top of the group service. Detection and restart of failed servers could be created using the notification component of the group service, with the recovery procedures invoked as part of the restart. Stable logging of messages and persistent variables would need to be created independently of the group service, perhaps using a transaction or replication mechanism. Distributed Oz [52], a concurrent object-oriented language, provides mobility  77  of objects in a network of systems. Objects are fine-grained language level entities, and may be active or passive. The main contribution of Distributed Oz is that the implementation of objects is network transparent (computations behave correctly independently of how they are partitioned between sites). This is achieved by avoiding forwarding chains through intermediate sites for mobile objects, removing the problem of a mobile object leaving behind a trail of surrogate objects forwarding messages to the object. The failure model of Distributed Oz is not defined as of [52], with only a discussion of failures appearing as exceptions. Finding a high-level abstraction for fault-tolerance that separates an application's functionality from its fault-tolerant behaviour is noted as future work. The group service provides this separation of functionality by providing a well understood failure model for applications, essentially providing a network transparent failure model.  78  Chapter 6  F u t u r e Work and Conclusions 6.1  Future Work  Future work can be divided into two broad categories. The first is improving and enhancing the current implementation. This category addresses the issues identified in Sections 3.4 and 3.5 (Performance and Limitations). The second is applying and extending the concepts of the group service to other areas. The group service provides crash failure semantics. Section 4.3 investigates the use of replication using the group service. Further work is needed to investigate the structuring and use of other distributed systems fault-tolerance techniques using the group service. Other possibilities include distributed transactions, checkpoint/logging, and process groups. Each one of these techniques may impose specific requirements on the group service, for example checkpoint/logging may require a checkpoint taken prior to group failure if possible. With the current interest in the Java programming language and the likelihood that it will become a defacto standard for programming distributed systems,  79  it is also important to investigate the application of grouping to this environment. Java as it currently exists does not have process or object mobility. However, the Java environment is well suited to adding this capability. In addition, grouping may be of some use even without object mobility, for example when a logical component is located on several different physical hosts. One of the strongest justifications for object mobility comes from the area of dynamic load balancing: the ability to migrate objects to balance CPU and other resource utilization in a group of systems. Further investigation into the interaction between dynamic load balancing and the group service is required. For example, what affect does the overhead of the group service have on the load balancing policies?  6.2  Conclusions  Implementing fault tolerance in distributed systems with fine-grained object mobility imposes some unique constraints. In particular, it is not generally possible for software components to assume crash failure semantics in these systems. Due to the condition that objects in a component may reside at and move among multiple hosts in the network, at best arbitrary failure semantics can be assumed. Unfortunately, almost all classic techniques for fault tolerance in distributed systems assume crash failure semantics. We have discussed a technique called grouping that groups objects of a component into a logical group with the condition that either all objects in the group are available or all have failed. This allows the assumption of crash failure semantics and the application of classic distributed systems fault-tolerance techniques such as replication and distributed transactions, in spite of the fact that a component may 80  reside at and move among multiple nodes. This technique has been implemented in the Emerald programming language and environment as a group service. The group service provides three fundamental services: reliability, notification, and location. Reliability enforces the condition that objects in a group are all available or all failed. Notification allows other objects to be informed of the failure of a group. Location enables convenient movement of all members of a group. Two example applications using this service, a name server and a distributed system monitor, show the applicability of this technique to produce practical faulttolerant applications. The name server uses a primary/backup model of replication. The distributed system monitor is composed of four monitor applications: active/failed node monitor, CPU load monitor, latency/bandwidth monitor, and a garbage collection monitor. All of these applications use the group service to achieve fault tolerance.  81  Bibliography [1] A. El Abbadi, D. Skeen, and C. Cristian. An efficient fault-tolerant protocol for replicated data management. In 4th Annual ACM SIGACT/SIGMOD Symposium on Principles of Data Base Systems, 1985. [2] P. Amaral, C. Jacquemot, P. Jensen, R. Lea, and A. Mirowski. Transparent object migration in C00L-2. In Proceedings of Workshop on Dynamic Object Placement and Load-Balancing in Parallel and Distributed Systems, ECOOP'92, 1992. [3] J. Bartlett. A nonstop kernel. In 8th ACM Symposium on Operating System Principles, 1981. [4] P. A. Bernstein, V. Hadzilacos, and N. Goodman. Concurrency Control and Recovery in Database Systems. Addison Wesley, 1987. [5] P. A. Bernstein, D. W. Shipman, and J. B. Rothnie. Concurrency control in a system for distributed databases (SDD-1). ACM Transactions on Database Systems, 5(1), 1980. [6] K. P. Birman. The process group approach to reliable distributed computing. Communications of the ACM, 36(12), 1993. [7] K. P. Birman and T.A. Joseph. Reliable communication in the presence of failures. ACM Transactions on Computer Systems, 5(1), 1987. [8] A. P. Black. Supporting distributed applications: Experience with Eden. In 10th ACM Symposium on Operating System Principles, 1985. [9] T. C. Bressoud and F. B. Schneider. Hypervisor-based fault-tolerance. ACM Transactions on Computer Systems, 14(1), 1996. [10] L. Cardelli. A language with distributed scope. Computing Systems, 8(1), 1995. [11] S. J. Caughey, G. D. Parrington, and S. K. Shrivastava. Shadows - a flexible support system for objects in distributed systems. In Proceedings of the 3rd IEEE International Workshop on Object Orientation in Operating Systems (IWOOOS'93), 1993.  82  S. J. Caughey and S. K. Shrivastava. Architectural support for mobile objects in large scale distributed systems. In Proceedings of the 5th IEEE International Workshop on Object Orientation in Operating Systems (IWOOOS'95), 1993. S. Ceri and G. Pelagatti. On the use of optimistic methods for concurrency control in distributed databases. In Proceedings of the 6th Berkeley Workshop on Distributed Data Management and Computer Networks, 1982. S. Ceri and G. Pelagatti. McGraw-Hill, 1985.  Distributed Databases - Principles and Systems.  E. G. Chang and R. Roberts. An improved algorithm for decentralized extremafinding in circular configurations of processors. Communications of the ACM, 22(5), 1991. M. Chen, E. K. Lee, G. A. Gibson, R. H. Katz, and D. A. Patter. RAID: Highperformance, reliable secondary storage. ACM Computing Surveys, 26(2), 1994. P. M. Chen, W. T. Ng, S. Chandra, C. M. Aycock, G. Rajamani, and D. Lowell. The Rio file cache: Surviving operating system crashes. In Proceedings of the 1996 International Conference on Architectural Support for Programming Languages and Operating Systems (ASPLOS), 1996. A. Choudhary, W. Kohler, J. Stankovic, and D. Towsley. A modified priority based probe algorithm for distributed deadlock detection and resolution. IEEE Transactions on Software Engineering, 15(1), 1989. G. Coulouris, J. Dollimore, and T. Kindberg. Distributed Systems: and Design. Addison Wesley, second edition, 1994.  Concepts  F. Cristian. Understanding fault-tolerant distributed systems. Communications of the ACM, 34(2), 1991. S. B. Davidson. Optimism and consistency in partitioned database systems. ACM Transactions on Database Systems, 9(3), 1984. S. B. Davidson, H. Garcia-Molina, and D. Skeen. Consistency in partitioned networks. A CM Computing Surveys, 17(3), 1985. A. Fekete, N. Lynch, and A. Shvartsman. Specifying and using a partitionable group communication service. In 16th Annual ACM Principles of Distributed Computing, 1997. M. J. Franklin, M. J. Carey, and M. Livny. Transactional client-server cache consistency: Alternatives and performance. ACM Transactions on Database Systems, 22(3), 1997. D. K. Gifford. Violet: An experimental decentralized system. ACM Operating Systems Review, 13(5), 1979. D. K. Gifford. Weighted voting for replicated data. In 7th ACM on Operating System Principles, 1979. 83  Symposium  J. Gray. Nodes on database operating systems. In R. Bayer, R. M. Graham, and G. Seegmuller, editors, Operating Systems: An Advanced Course, volume 60 of Lecture Notes in Computer Science. Springer-Verlag, 1978. Object Management Group. The common object request broker: Architecture and specification, 1995. Revision 2.0. T. Harder. Observations on optimistic concurrency control. Information Systems, 9(2), 1984. T. Harder and A. Reuter. Principles of transaction-oriented database recovery. Computing Surveys, 15(4), 1983. M. Herlily. A quorum-consensus replication method for abstract data types. ACM Transactions on Computer Systems, 4(1), 1986. N. Hutchinson and M. Dumont. Building replication and transactions into Emerald. Personal copy, 1997. A. D. Joseph, A. F. deLespinasse, J. A. Tauber, D. K. Gifford, and M. F. Kaashoek. Rover: A toolkit for mobile information access. In 15th ACM Symposium on Operating System Principles, 1995. A. D. Joseph and M. F. Kaashoek. Building reliable mobile-aware applications using the Rover toolkit. In Proceedings of the 2nd ACM International Conference on Mobile Computing and Networking, 1996. E. Jul, N. Hutchinson, and A. Black. Fine-grained mobility in the Emerald system. ACM Transactions on Computer Systems, 6(1), 1988. F. Kaashoek, A. Tanenbaum, S. Flynn Hummel, and H. Bal. An efficient reliable broadcast protocol. ACM Operating Systems Review, 23(4), 1989. R. Koo and S. Toueg. Checkpointing and rollback recovery for distributed systems. IEEE Transactions on Software Engineering, 13(1), 1987. N. Kronenberg, H. Levy, and W. Strecker. Vaxclusters: A closely-coupled distributed system. ACM Transactions on Computer Systems, 4(2), 1986. H.T. Kung and J. T. Robinson. Optimistic methods for concurrency control. ACM Transactions on Database Systems, 6(2), 1981. R. Ladin, B. Liskov, L. Shrira, and S. Ghemawat. Providing availability using lazy replication. ACM Transactions on Computer Systems, 10(4), 1992. L. Lamport. Time, clocks and the ordering of events in a distributed system. Communications of the ACM, 21(7), 1978. B. Liskov. Distributed programming in Argus. Communications of the ACM, 31(3), 1988. B. Liskov, S. Ghemawat, R. Gruber, P. Johnson, and L. Shrira. Replication in the Harp file system. In 13th ACM Symposium on Operating System Principles, 1991. 84  [44] D. E. Lowell and P. M. Chen. Free transactions with Rio Vista. In 16th ACM Symposium on Operating System Principles, 1997. [45] W. Lux. Adaptable object migration: Concept and implementation. SIGOPS Operating System Review, 29(2), 1995.  ACM  [46] S. Maffeis. Adding group communication and fault-tolerance to CORBA. In USENIX Conference on Object-Oriented Technologies, 1995. [47] L. E. Moser, P. M. Melliar-Smith, and D. Agrawal. Totem: A fault-tolerant multicast group communication service. Communications of the ACM, 39(4), 1996. [48] P. Narasimhan, L. E. Moser, and P. M. Melliar-Smith. Exploiting the internet inter-orb protocol interface to provide CORBA with fault tolerance. In 3rd USENIX Conference on Object-Oriented Technologies, 1997. [49] M. Raynal. About logical clocks for distributed systems. Systems Review, 26(1), 1992.  ACM Operating  [50] D. P. Reed. Implementing atomic actions on decentralized data. ACM Transactions on Computer Systems, 1(1), 1983. [51] G. Ricart and A.K. Agawala. An optimal algorithm for mutual exclusion in computer networks. Communications of the ACM, 24(1), 1981. [52] P. V. Roy, S. Haridi, and P. Brand. Mobile objects in Distributed Oz. ACM Transactions on Programming Languages and Systems, 19(5), 1997. [53] M. Satyanarayanan, J. J. Kistler, P. Kumar, M. E. Okasaki, E. H. Siegel, and D. C. Steere. Coda: A highly available file system for a distributed workstation environment. IEEE Transactions on Computers, 39(4), 1990. [54] G. Schlageter. Problems of optimistic concurrency control in distributed database systems. SIGMOD Record, 13(3), 1982. [55] R. D. Schlichting and V. T. Thomas. Programming language support for writing fault-tolerant distributed software. IEEE Transactions on Computers, 44(2), 1995. [56] F. Schneider. Implementing fault-tolerant services using the state machine approach: A tutorial. A CM Computing Surveys, 22(4), 1990. [57] M. Shapiro, P. Gautron, and L. Mosseri. Persistence and migration for C + + objects. In Proceedings of the European Conference on Object-Oriented Programming (ECOOP'89), 1989. [58] M. Shapiro, Y. Gourhant, S. Habert, L. Mosseri, M. Ruffin, and C. Valot. SOS: An object-oriented operating system - assessment and perspectives. Computing Systems, 2(4), 1989. [59] S. Shrivastava, G. N. Dixon, and G. D. Parrington. An overview of the Arjuna distributed programming system. IEEE Software, January 1991. 85  [60] A. Siberschatz, J. Peterson, and P. Galvin. McGraw-Hill, fourth edition, 1991.  Operating Systems  Concepts.  [61] M. Sinha and N. Natarajan. A priority based distributed deadlock detection algorithm. IEEE Transactions on Software Engineering, 11(1), 1985. [62] R. E. Strom and S. Yemini. Optimistic recovery in distributed systems. ACM Transactions on Computer Systems, 3(3), 1985. [63] R. van Renesse, K. P. Birman, and S. Maffeis. Horus: A flexible group communication service. Communications of the ACM, 39(4), 1996. [64] Y.-M. Wang. Consistent global checkpoints that contain a given set of local checkpoints. IEEE Transactions on Computers, 13(1), 1997. [65] G. Weikum. Principles and realizations strategies of multilevel transactions management. ACM Transactions on Database Systems, 16(1), 1991.  86  


Citation Scheme:


Citations by CSL (citeproc-js)

Usage Statistics



Customize your widget with the following options, then copy and paste the code below into the HTML of your page to embed this item in your website.
                            <div id="ubcOpenCollectionsWidgetDisplay">
                            <script id="ubcOpenCollectionsWidget"
                            async >
IIIF logo Our image viewer uses the IIIF 2.0 standard. To load this item in other compatible viewers, use this url:


Related Items