Mutex to Read From Message Queue Linux
## Goals of concurrent program design Now is a good fourth dimension to pop up a level and expect at what we're doing. Retrieve that our primary goals are to create software that is **prophylactic from bugs**, **like shooting fish in a barrel to understand**, and **ready for change**. Building concurrent software is clearly a claiming for all iii of these goals. We can break the issues into two full general classes. When we enquire whether a concurrent plan is *safe from bugs*, we intendance about two backdrop: + **Safety.** Does the concurrent program satisfy its invariants and its specifications? Races in accessing mutable data threaten safety. Condom asks the question: can you prove that **some bad matter never happens**? + **Liveness.** Does the program keep running and eventually practise what you lot want, or does it go stuck somewhere waiting forever for events that volition never happen? Can you testify that **some skillful matter eventually happens**? Deadlocks threaten liveness. Liveness may likewise crave *fairness*, which means that concurrent modules are given processing chapters to make progress on their computations. Fairness is mostly a matter for the operating system's thread scheduler, just you can influence information technology (for practiced or for sick) by setting thread priorities. ## Bulletin passing with threads Nosotros've previously talked about *message passing* between processes: [clients and servers communicating over network sockets](../../nineteen-sockets-networking). We can too use bulletin passing between threads within the aforementioned process, and this design is often preferable to a shared retentivity design with locks. Utilize a synchronized queue for bulletin passing between threads. The queue serves the same function as the buffered network communication channel in client/server message passing. Coffee provides the [`BlockingQueue`](java:java/util/concurrent/BlockingQueue) interface for queues with blocking operations: In an ordinary [`Queue`](java:coffee/util/Queue): + **`add(eastward)`** adds element `east` to the end of the queue. + **`remove()`** removes and returns the element at the caput of the queue, or throws an exception if the queue is empty. A synchronized queue, co-ordinate to the Java API documentation: > additionally supports operations that wait for the queue to become non-empty when retrieving an element, and await for space to become available in the queue when storing an element. + **`put(e)`** *blocks* until it can add element `e` to the end of the queue (if the queue does non have a size leap, `put` will not block). + **`take()`** *blocks* until it can remove and return the chemical element at the head of the queue, waiting until the queue is non-empty.
Analogous to the client/server design for message passing over a network is the **producer-consumer design pattern** for message passing between threads. Producer threads and consumer threads share a synchronized queue. Producers put information or requests onto the queue, and consumers remove and procedure them. One or more producers and ane or more than consumers might all be adding and removing items from the same queue. This queue must exist safe for concurrency. Java provides two implementations of `BlockingQueue`: + [`ArrayBlockingQueue`](java:java/util/concurrent/ArrayBlockingQueue) is a stock-still-size queue that uses an assortment representation. `put`ting a new particular on the queue volition block if the queue is full. + [`LinkedBlockingQueue`](java:coffee/util/concurrent/LinkedBlockingQueue) is a growable queue using a linked-list representation. If no maximum chapters is specified, the queue volition never fill up, so `put` will never block. Different the streams of bytes sent and received past sockets, these synchronized queues (similar normal collections classes in Java) can agree objects of an capricious type. Instead of designing a wire protocol, nosotros must choose or design a blazon for messages in the queue. And just as we did with operations on a threadsafe ADT or messages in a wire protocol, nosotros must design our messages here to prevent race conditions and enable clients to perform the atomic operations they need. ### Banking company account case
Our first instance of bulletin passing was the [bank account example](../../17-concurrency/#message_passing_example). Each cash car and each account is its ain module, and modules interact by sending messages to i another. Incoming messages make it on a queue. Nosotros designed messages for `get-residual` and `withdraw`, and said that each cash machine checks the account residuum earlier withdrawing to prevent overdrafts: ``` become-balance if residuum >= ane then withdraw one ``` But it is all the same possible to interleave messages from 2 greenbacks machines so they are both fooled into thinking they tin can safely withdraw the last dollar from an account with only $1 in information technology. We demand to choose a better diminutive operation: `withdraw-if-sufficient-funds` would be a improve functioning than just `withdraw`. ## Implementing message passing with queues You lot tin come across all the code for this example on GitHub: [**squarer example**](https://github.com/mit6005/F14-EX20-square). All the relevant parts are excerpted below. Here's a message passing module for squaring integers: ```java /** Squares integers. */ public form Squarer { individual final BlockingQueue<Integer> in; private final BlockingQueue<SquareResult> out; // Rep invariant: in, out != cypher /** Make a new squarer. * @param requests queue to receive requests from * @param replies queue to send replies to */ public Squarer(BlockingQueue<Integer> requests, BlockingQueue<SquareResult> replies) { this.in = requests; this.out = replies; } /** Beginning handling squaring requests. */ public void commencement() { new Thread(new Runnable() { public void run() { while (true) { // TODO: we may desire a fashion to stop the thread try { // block until a request arrives int x = in.accept(); // compute the answer and ship information technology dorsum int y = x * x; out.put(new SquareResult(x, y)); } catch (InterruptedException ie) { ie.printStackTrace(); } } } }).first(); } } ``` Incoming messages to the `Squarer` are integers; the squarer knows that its job is to square those numbers, and then no further details are required. Outgoing letters are instances of `SquareResult`: ```java /** A squaring upshot message. */ public form SquareResult { individual last int input; individual final int output; /** Make a new result message. * @param input input number * @param output square of input */ public SquareResult(int input, int output) { this.input = input; this.output = output; } @Override public String toString() { return input + "^two = " + output; } } ``` We would probably add additional observers to `SquareResult` and so clients can retrieve the input number and output result. Finally, here's a chief method that uses the squarer: ```java public static void chief(String[] args) { BlockingQueue<Integer> requests = new LinkedBlockingQueue<>(); BlockingQueue<SquareResult> replies = new LinkedBlockingQueue<>(); Squarer squarer = new Squarer(requests, replies); squarer.start(); endeavor { // make a asking requests.put(42); // ... perhaps do something meantime ... // read the reply System.out.println(replies.take()); } catch (InterruptedException ie) { ie.printStackTrace(); } } ``` It should not surprise us that this code has a very similar flavor to the lawmaking for implementing message passing with sockets. ### Stopping What if we want to shut downwards the `Squarer` then it is no longer waiting for new inputs? In the client/server model, if we desire the customer or server to stop listening for our messages, nosotros shut the socket. And if we want the client or server to end altogether, nosotros tin can quit that procedure. But hither, the squarer is just another thread in the *same* process, and we tin can't "shut" a queue. One strategy is a *poison pill*: a special message on the queue that signals the consumer of that message to stop its piece of work. To close down the squarer, since its input messages are merely integers, we would accept to choose a magic poison integer (everyone knows the foursquare of 0 is 0 right? no one volition demand to ask for the square of 0...) or use null (don't use null). Instead, we might modify the type of elements on the requests queue to an ADT: SquareRequest = IntegerRequest + StopRequest with operations: input : SquareRequest → int shouldStop : SquareRequest → boolean and when we want to stop the squarer, nosotros enqueue a `StopRequest` where `shouldStop` returns `true`. For example, in `Squarer.start()`: ```java public void run() { while (true) { try { // cake until a asking arrives SquareRequest req = in.take(); // meet if we should end if (req.shouldStop()) { break; } // compute the reply and send information technology back int ten = req.input(); int y = x * 10; out.put(new SquareResult(x, y)); } catch (InterruptedException ie) { ie.printStackTrace(); } } } ``` Information technology is also possible to *interrupt* a thread by calling its `interrupt()` method. If the thread is blocked waiting, the method it's blocked in will throw an `InterruptedException` (that's why we have to endeavor-take hold of that exception almost whatever time we call a blocking method). If the thread was not blocked, an *interrupted* flag will exist set. The thread must check for this flag to encounter whether it should stop working. For example: ```coffee public void run() { // handle requests until nosotros are interrupted while ( ! Thread.interrupted()) { try { // block until a request arrives int x = in.take(); // compute the respond and send it back int y = 10 * x; out.put(new SquareResult(x, y)); } catch (InterruptedException ie) { // end interruption; } } } ``` ## Thread prophylactic arguments with message passing A thread safety argument with message passing might rely on: + **Existing threadsafe information types** for the synchronized queue. This queue is definitely shared and definitely mutable, so we must ensure information technology is safety for concurrency. + **Immutability** of letters or information that might be accessible to multiple threads at the same time. + **Confinement** of data to individual producer/consumer threads. Local variables used by one producer or consumer are not visible to other threads, which but communicate with one another using messages in the queue. + **Confinement** of mutable messages or data that are sent over the queue but volition only be attainable to ane thread at a time. This argument must exist carefully articulated and implemented. But if one module drops all references to some mutable information like a hot potato equally soon as it puts them onto a queue to be delivered to some other thread, only one thread will have access to those data at a fourth dimension, precluding concurrent access. ## Message passing deadlock When buffers make full up, message passing systems can experience deadlock. In a deadlock, 2 (or more) concurrent modules are both blocked waiting for each other to do something. Since they're blocked, no module will be able to make anything happen, and none of them will suspension the deadlock. In full general, in a system of multiple concurrent modules communicating with each other, nosotros can imagine drawing a graph in which the nodes of the graph are modules, and there'southward an edge from A to B if module A is blocked waiting for module B to do something. The system is deadlocked if at some betoken in time, there is a *cycle* in this graph. The simplest instance is the two-node deadlock, A → B and B → A, but more complex systems tin can achieve (not that they'll be happy about information technology) larger deadlocks. A bulletin passing arrangement in deadlock appears to but hang, just the way a deadlock with locks does. ### Example: network square server Allow's see an example of message passing deadlock. We'll use a network version of the foursquare server, so you can compare their implementations. All you demand to pay attention to is the spec of the server: the messages it receives and sends. ```coffee /** * SquareServer is a server that squares integers passed to information technology. * It accepts requests of the form: * Request ::= Number "\n" * Number ::= [0-9]+ * and for each request, returns a respond of the form: * Respond ::= (Number | "err") "\n" * where a Number is the square of the request number, * or "err" is used to indicate a misformatted request. * SquareServer tin handle only 1 client at a fourth dimension. */ public class SquareServer { /** Default port number where the server listens for connections. */ public static final int SQUARE_PORT = 4949; private ServerSocket serverSocket; // Rep invariant: serverSocket != null /** Make a SquareServer that listens for connections on port. * @param port port number, requires 0 <= port <= 65535 */ public SquareServer(int port) throws IOException { serverSocket = new ServerSocket(port); } /** Run the server, listening for connections and treatment them. * @throws IOException if the main server socket is broken */ public void serve() throws IOException { while (true) { // block until a client connects Socket socket = serverSocket.accept(); attempt { handle(socket); } grab (IOException ioe) { ioe.printStackTrace(); // but don't stop serve() } finally { socket.close(); } } } /** Handle 1 client connectedness. Returns when client disconnects. * @param socket socket where customer is connected * @throws IOException if connexion encounters an error */ private void handle(Socket socket) throws IOException { System.err.println("client connected"); // get the socket'southward input stream, and wrap converters around it // that catechumen it from a byte stream to a character stream, // and that buffer it so that we tin can read a line at a time BufferedReader in = new BufferedReader(new InputStreamReader( socket.getInputStream())); // similarly, wrap graphic symbol=>bytestream converter around the // socket output stream, and wrap a PrintWriter around that and then // that we have more convenient means to write Coffee archaic // types to it. PrintWriter out = new PrintWriter(new OutputStreamWriter( socket.getOutputStream())); try { // each request is a unmarried line containing a number for (Cord line = in.readLine(); line != null; line = in.readLine()) { Organisation.err.println("asking: " + line); try { int x = Integer.valueOf(line); // compute answer and transport dorsum to customer int y = x * x; System.err.println("reply: " + y); out.print(y + "\due north"); } catch (NumberFormatException eastward) { // complain about ill-formatted request System.err.println("reply: err"); out.println("err"); } // important! flush our buffer then the respond is sent out.affluent(); } } finally { out.close(); in.close(); } } /** Start a SquareServer running on the default port. */ public static void principal(String[] args) { try { SquareServer server = new SquareServer(SQUARE_PORT); server.serve(); } catch (IOException east) { due east.printStackTrace(); } } } ``` Hither'due south a client for our number-squaring protocol; of course we could also talk to the server in a program similar Telnet: ```java /** * SquareClient is a client that sends requests to the SquareServer * and interprets its replies. * A new SquareClient is "open" until the close() method is chosen, * at which point it is "closed" and may not be used further. */ public class SquareClient { private Socket socket; private BufferedReader in; private PrintWriter out; // Rep invariant: socket, in, out != null /** Make a SquareClient and connect information technology to a server running on * hostname at the specified port. * @throws IOException if can't connect */ public SquareClient(String hostname, int port) throws IOException { socket = new Socket(hostname, port); in = new BufferedReader(new InputStreamReader( socket.getInputStream())); out = new PrintWriter(new OutputStreamWriter( socket.getOutputStream())); } /** Send a asking to the server. Requires this is "open". * @param x number to foursquare * @throws IOException if network or server failure */ public void sendRequest(int x) throws IOException { out.print(x + "\n"); out.flush(); // important! brand certain ten really gets sent } /** Get a reply from the adjacent request that was submitted. * Requires this is "open up". * @return square of requested number * @throws IOException if network or server failure */ public int getReply() throws IOException { Cord reply = in.readLine(); if (reply == nada) { throw new IOException("connection terminated unexpectedly"); } endeavour { render Integer.valueOf(reply); } catch (NumberFormatException nfe) { throw new IOException("misformatted reply: " + reply); } } /** Closes the client'southward connection to the server. * This client is now "closed". Requires this is "open". * @throws IOException if close fails */ public void close() throws IOException { in.close(); out.close(); socket.close(); } } ``` Finally, here's the cleaved part: a plan that uses `SquareClient` to talk to the `SquareServer`: ```java individual static final int N = 100; /** Use a SquareServer to square all the integers from 1 to N. */ public static void primary(String[] args) throws IOException { SquareClient client = new SquareClient("localhost", SquareServer.SQUARE_PORT); // ship the requests to foursquare 1...N for (int x = 1; x <= N; ++x) { customer.sendRequest(10); Organization.out.println(x + "^ii = ?"); } // collect the replies for (int 10 = 1; x <= N; ++10) { int y = customer.getReply(); System.out.println(ten + "^ii = " + y); } customer.close(); } ``` Can you smell danger in the air? As `Northward` grows larger and larger, our customer is making many requests to the `SquareServer` *without reading whatsoever replies*. Eventually the client'southward receive buffer volition make full upward with unread replies. Then the server's sending buffer will fill, since data can't be sent successfully. Finally, i of its calls to `out.print` will block, waiting for more buffer infinite --- and we accept our mortiferous embrace. The server is waiting for the client to read some replies, simply the client is waiting for the server to accept its requests. Deadlock. ### Concluding suggestions for preventing deadlock 1 solution to deadlock is to design the system then that there is no possibility of a cycle --- so that if A is waiting for B, information technology cannot exist that B was already (or will get-go) waiting for A. Some other arroyo to deadlock is *timeouts*. If a module has been blocked for too long (maybe 100 milliseconds? or 10 seconds? how to make up one's mind?), then you end blocking and throw an exception. Now the problem becomes: what do you do when that exception is thrown? mitx:1a562ed19fac4b0eae5a4e2129e6785b Message passing ## Next: [Concurrency in practice, and a summary](../#concurrency_in_practice)
Source: https://web.mit.edu/6.005/www/fa14/classes/20-queues-locks/message-passing/
0 Response to "Mutex to Read From Message Queue Linux"
Post a Comment