r1 - 09 Jun 2006 - 15:47:29 - TimPetersonYou are here: TWiki >  GRAPEcluster Web  >  Documentation > FunctionDataStreaming

Spiegel Streaming Functions

In Spiegel, the most basic method of data transfer between Functions is to pass an object containing all relevant data to a connection to another Function. The transfer occurs once per update, and must contain all of the data to be transferred. Sometimes, however, the data consists merely of a list of objects of the same type, each of which receives the same processing. In this case it may be more efficient to send them between Functions as they are processed, rather than buffering them all until completion and then sending the whole list at once. This allows multiple Functions in the pipeline to process the data simultaneously, possibly increasing productivity. This process is called streaming, because data is constantly flowing from one Function to another.

Normally a Function's update() method will retrieve data from its inputs, process it somehow, and send it out the outputs. The Functions connected to that output then have their update() methods called, and the process repeats. With streaming, the same actions occur, just with different timing. At the beginning of the update() method an ObjectPipe object is created. This object implements the pipeline between the two Functions. The pipeline object is then immediately sent out the output. Next, the method proceeds to process its input data. When each item has been processed, it is written to the pipe. The pipe ensures that everything written to it reaches its destination(s). Upon completion, the pipe is closed, signalling that no more data is to come. The first update() method then exits.

When the Function sends the ObjectPipe object at the beginning of its update() method, this triggers the next Function's update() method to be called, just as if the complete data object had been sent. This method is called concurrently, in a separate thread from the first. It begins by retrieving the ObjectPipe object from its input. Now that both simultaneously-executing update() methods have references to the same object, communication can occur. The second Function enters a loop in which it repeatedly tries to read items of data from the pipe. When objects are sent by the producing function, they are queued in the pipe until the consumer requests them. The pipe has a maximum capacity (determined at creation), which when reached will cause any writes to block until room is available. Likewise, the consumer will block if the pipe is empty. In this way the producer and consumer Functions can process items of data concurrently. When the producer has finished sending data, it closes the pipe, which, when the pipe has emptied, notifies the consumer of the completion.

The implementation of this sequence of events is fairly simple. Here is a sample skeleton for a producer using batch processing:

public void update() {
    List objects = new ArrayList();

    for(...) {
        Object obj;

        // Generate an object

        objects.add(obj);
    }

    output.set(objects);
}

Here is the corresponding batch-processing consumer:

public void update() {
    List objects = input.get();

    for(Object obj : objects) {
        // Process the object
    }
}

To use streaming processing, the producer is changed as follows:

public void update() {
    ObjectPipe pipe = new ObjectPipe(output);
    output.set(pipe);

    for(...) {
        Object obj;

        // Generate an object

        pipe.write(obj);
    }

    pipe.close();
}

The consumer changes as follows:

public void update() {
    ObjectPipe pipe = input.get();

    for(Object obj : pipe.reader(input)) {
        // Process the object
    }
}

Function outputs can be connected to multiple inputs on other Functions, so the ObjectPipe handles this by creating multiple internal queues with which to keep track of each consumer's current position in the stream. For this reason, the consumer must inform the pipe which Function is doing the reading, by passing a reference to the input being read. In order to initialize the multiple queues, the pipe must know how many consumers are being served, necessitating the passing of the output to the ObjectPipe constructor.

The data produced on an output is stored until it is updated, alleviating the need to recompute any output unless the inputs have changed. With streaming, however, the data is not stored. As soon as an item has been consumed by all consumers, it is discarded. While the pipe object remains attached to the output, it is of no use to the connected functions. If a consumer needs to update, the data must be obtained afresh from upstream. Thus, when a Function that has read from a pipe is invalidated, the invalidation state is propagated back up the pipe to the producer. This assures that the pipe will be reset and refilled to satisfy the updating Function.

-- TimPeterson - 09 Jun 2006

Edit | Attach | Printable | Raw View | Backlinks: Web, All Webs | History: r1 | More topic actions
General Information
Technology
  • Resources

Documentation
Repository
Related Projects
  • GUI Development
  • MovieMaker?
  • GUI
  • 3D Input Devices
  • Fly Through Path

Related Sites

 
Powered by TWiki
This site is powered by the TWiki collaboration platformCopyright © by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback