EDITING BOARD
RO
EN
×
▼ BROWSE ISSUES ▼
Issue 21

Tick Tock on Beanstalkd Message Queues

Tudor Mărghidanu
Software Architect
@Yardi România
PROGRAMMING

Time is generally a pretty restrictive dimension; all the more so in IT, where every product, regardless of its stage of development, is submitted to this measurement. Moreover, IT developers have divided time into different categories and the resources that are allocated to each project are mainly targeted at increasing time efficiency concerning the development of the product. In this article, I will only talk about the time allocated for the execution of one application during a given session.

I"m sure many of you are familiar with the notion of message queues, especially if you"ve had the opportunity to work with applications that work with asynchronous operations. These message queues offer a series of undeniable advantages, such as:

  • Decoupling - The separation of the application"s logic
  • Scalability - More clients can process data at the same time
  • Redundancy - The errors are not lost and you can start re-processing them

There are more services that help implement these message queues, but in this article I will only talk about Beanstalkd.

Beanstalkd

Beanstalkd is a service with a generic interface, which was developed in order to cut down on the latency period between the processes of an application that requires longer execution time. Thanks to its generic interface, this service represents a major scalability factor within the application that needs to be developed. Beanstalkd doesn"t require any implementation limit (language or marshalling) because it uses PUSH sockets for communication and has a very simple protocol. I will explain some of the more important terms in the Beanstalkd terminology so that you get a better view on what the rest of the article will focus on:

  • Job: It represents the message itself, serialized according to personal criteria or according to the user library that is being used.
  • Tube: The Namespace that is used for the queue. Beanstalkd accepts more queues at the same time.
  • Producer: The process that deals with queueing the messages on the tubes.
  • Consumer: The processes that use the messages which were queued on one or more tubes.
  • Operations: The Producer or the Consumer can perform the following operations with the jobs posted on the tubes.
    • put
    • reserve
    • delete
    • release
    • bury

Problem

My belief is that a good learning method is one that offers examples, so I thought about a problem for this article:

"Build a web application where the users may upload video files in various formats, so that they would be available for display on one of the application"s pages."

The importance of this statement comes from the fact that it is vague enough to leave room for the scalability of the problem, which in turn triggers the scalability of the solution; but its beauty lies in fact in Beanstalkd"s simplicity.

Once the client is implemented, its execution can be scaled both vertically (more system processes on the same machine) and horizontally (more system processes on different machines), using the same initial rule.

The figure illustrates the data flow within the application and the way in which the web application should interact with the users, by using a layer of shared storage.

Think about a situation where users upload a set of video files on a given page, videos that enter a pre-defined process that fulfills two major functions: the first one deals with storing the file in a pre-defined persistence layer (distributed file system or database); the second function prepares and writes a message that contains information pointing you to the reference in the persistence layer. From this point on, the operation becomes an asynchronous and distributed one; if the ratio between the number of consumers and the frequency of input data was correctly determined, the files that were uploaded should be processed in a short time.

package MyApp::Globals;
# ... More static properties ...
use JSON::XS;
use Beanstalk::Client;
class_has "message_queue" => (
	is => "ro",
	isa => "Beanstalk::Client",
	default => sub {
		return Beanstalk::Client->new(
{
# NOTE: This usually should come from a configuration
file...
	server => "localhost",
# Making sure we serialize/deserialize via JSON.
encoder => sub { encode_json( shift() ); },
decoder => sub { decode_json( shift() ); },
		}
	);
}
);
package MyApp::Web::Controllers::Videos;
# ...
sub upload {
	my $self = shift();
	# Retrieving the uploaded video.
	my $video = $self->req()->upload( "video" );
# Additional content and headers validation ...
# Storing the video in the persistance layer ...
my $object = MyApp::Globals->context()
	->dfs()->raw_videos(
		{
			filename => $video->filename(),
			headers => $video->headers()->to_hash(),
			data => $video->slurp(),
			# ... additional user data
		}
	);
# Making sure we use the right tube for sending the #
data.
MyApp::Globals->message_queue()
	->use( "raw_videos" );
# Storing the data in the queue...
MyApp::Globals->message_queue()
	->put(
{
priority => 10000,
data => $object->pack(),
	# Serialization occurs automatically ...
	}
);
}

The consumers work as fast as they can, requesting messages from Beanstalkd as they process the data. At this point, we change the status of the message as we go along. In this way, we can track the number of times the program was run correctly and also the number of mistakes we found. If we encounter an error, we can change the status of the messages that we marked as wrong once the problem was solved.

Another important aspect is that the parallel connection of the consumers can be achieved through system processes, which leads to a considerable ease of management and to the elimination of resource locking and memory leaks.

# Getting messages only from these tubes ...
MyApp::Globals->message_queue()
->watch_only( "raw_videos" );

while( 1 ) {
# Retrieving a job from the message queue and
# marking it as reserved...
my $job = MyApp::Globals->message_queue()
->reserve();

eval {
my $data = $job->args();
# Automatic data deserialization ...
# Doing the magic on the data here ...
};

# In case of an error we signal the error in
# back-end and budy the job.
if( my $error = $@ ) {
	$logger->log_error( $error );
	$job->bury();
} else {
	$job->delete();
	# If everything is ok we simply delete the job
	# from the tube!
	}
}

Conclusions

On a more personal note, I"ve always liked simple and elegant solutions that involve a minimum set of rules and simple terminology. Beanstalkd is a perfect example of this. But it is also important to note that the introduction of this service represents, to a certain degree, an integrating effort and no one should try to re-invent the wheel at this point in the development of the application.

Another vital aspect is the fact that, using a distributed system in this manner allows for both a compressing/dilation of time and a very obvious fragmentation of the execution process. Therefore, a process which, running sequentially, could take a few weeks to complete may be reduced to a few days or even a few hours, depending on the duration of the basic process.

Pros

  • Speed
  • Persistence
  • It doesn"t require any serialized model

Cons

  • The distributed mode is only supported in the client
  • Lack of a security model

Sponsors

  • comply advantage
  • ntt data
  • 3PillarGlobal
  • Betfair
  • Telenav
  • Accenture
  • Siemens
  • Bosch
  • FlowTraders
  • MHP
  • Connatix
  • UIPatj
  • MetroSystems
  • Globant
  • Colors in projects