Originally published in Rebol Forces.

Reb-IT!
Author: Maarten Koopmans
Date: 21-Mar-2002
Resources: Rugby distribution

Building a server engine

If you want to start using Rebol for server applications, you may consider building a server engine for all network communications. In this article I'll describe the HIgh Performance Engine of Rugby, hipe.

Hipe provides you with a TCP and UDP based server framework, and a simple form of threading.

What kind of features do we expect from a server framework?

  • TCP and UDP capable, as these are the dominant protocols of the Internet.
  • Easy addition of protocol handlers built on top of hipe.
  • Good performance.
  • Some simple form of threading. This is useful to start little tasks while we are in the network event loop.

So let's start our script with

Rebol []
hipe-serv: make object! [

As you can see we put the thing inside an object! to prevent global namespace pollution.

Now let's think some more about the features we want to have, and how we are going to implement this in terms of data we need to keep.

First UDP, which is really simple in Rebol. You wait on an UDP port and if it has an event you copy the data from the UDP datagram. In our case we just pass the data on to an associated handler function.

A server process in TCP typically goes like this: a server port is listening, a client connects and the server port accepts. Accepting on the server side results in a dedicated port for the client on the server side as well. All traffic on this port can then be 'handled'.

So it might be convenient to assign a function to every server port that act as a handler for the protocol (or better stated: IS the protocol). Of course the client port on the server may want to store some data as well, so putting the client port and its belonging in a dynamically created object is handy.

Our data.

;Our list of server ports
server-ports: copy []

These are our server ports that listen for incoming connections. We store them for later reference while processing network events.

;Our server to handler mapping
server-map: copy []

If we give every server port a handler for its protocol, its handy to store them in a block as well. Now we can use select to get the function! value of the appropiate handler later on.

; The list of ports we wait/all for in our main loop
port-q: copy []

Of course we need to store all accepted client ports as well. Again for IO processing, and whatever comes to mind. But...

; Mapping of ports to objects containg additional info
object-q: copy []

we also wanted to add data bound to the accepted client port! Hence we put the accepted client port in a block, followed by an object! that holds its related data. So we can use select to.... (yes, a pattern is emerging)

; Restricted server list
restricted-server: make block! 20
; Server restrictions?
restrict: no

The boolean restrict field indicates that we may not allow access to services in this engine to anyone. If it is set to "yes" , restricted-server is a block of ip addresses that we allow access from.

;The thread queue
threads: copy []
current-thread: none

And these are the things we will need for our threading engine: a block that holds the threads and current-thread as a 'pointer' to the current executing code.

conn-timeout: 0:0:30

This is a connection timeout. If a connection has no activity for this specified amount of time, it is dropped. This prevents a server from having lost of open ports that don't do a thing.

max-thread-waiting: 100

max-thread-waiting is a variable we will use to balance network traffic and the threading engine. It works like this: we process network events always and negelect threads unless there are more than max-thread-waiting. In that case we process threads as well. The effect is that setting this number low gives the threading engine more time, and setting it high gives the networking part the edge.

Restricting access

We use two functions for restricting access and validating. The first is restrict-to that simply sets our restrict variable to yes and appends a block of IP addresses to our restricted-server variable:

restrict-to: func [
{Sets server restrictions. The server will only serve to machines with
the IP-addresses found in the list.}
r [any-block!] {List of IP-addresses to serve.}
][
restrict: yes
append restricted-server r
]

And then we need a function to check that given a certain IP number, a connection is allowed:

allow?: func [
{Checks if a connection to the specified IP-address is allowed.}
ip [tuple!] {IP-address to check.}
][
return found? find restricted-server ip
]

The above function can thus be used when deciding whether or not to accept a connection.

Initializing a server

We need to be able to add server ports, associate them with handlers and look them up. Finding out if a given port (which has an event) is a server port is done by the is-server? function. It simply looks up whether or not the port is in the server-ports block.

is-server?: func [
{Check to see whether a given port is a server port.}
p [port!]
][
return found? find server-ports p
]

This implies that we need to add the server port there as well. And not only that, we need to associate it with a handler function! So add-server-port stores the port in the server-ports block, and stores an association between the port and the handler in the server-map.

add-server-port: func [
{Adds a server port and its handler to the list and the map}
p [port!]
handler [any-function!]
][
append server-ports p
append server-map p
append server-map :handler
return
]

And remove-server-port does, of course, the opposite:

remove-server-port: func [
{Removes a server from our list and map}
][
remove find server-ports p
remove remove find server-map p
return
]

Now let's put it all together in a function init-server-port, that not only stores all information using the above functions, but also opens the port and increases the backlog. The backlog is the number of connections that are not accepted yet by you but can be waiting for you. Default is in the order of 5. We open everything in no-wait mode by default, so you can write non-blocking handlers. You can always use set-modes to change this behaviour later on.

init-server-port: func [
{Initializes our main server port.}
p [port! url!]
conn-handler [any-function!]
/local dest
][
either url? p [dest: make port! p][dest: p]
 
add-server-port dest :conn-handler
append port-q dest
 
; Increase the backlog for this server. 15 should be possible (default
; is 5)
;REMOVE this for compatibility (o.a. Mac) or set it to 5 or so.
p/backlog: 15
open/no-wait dest
]

The threading engine

First we need to decide on the layout of a thread. We choose a very simple model, where a thread is just an object. The object may have all kind of fields, and some of the fields are just blocks that will be executed using 'do. In order to give the threading engine a hint as for which code to execute, a thread object will always have one field called code-pointer.

A code-pointer is just that: it contains a lit-word! value that is the word of the next code block to 'do. So the threading engine simple does a

do get/any in some-thread some-thread/code-pointer

What's cool about this is that you can use the code-pointer field to control the flow in a thread upon multiple invocations. Note that a thread is itself responsible for handling control back, a threading model commonly referred to as cooperative multithreading.

We will also add a special field, clean-up, which if present contains code that cleans up after a thread. For example to free resources such as files, network ports, database connection or....

Now let's add some functions. First two functions to add and remove threads (or objects):

add-thread: func [o [object!] {The thread to add}][
append hipe-serv/threads o
o
]
 
remove-thread: func [o [object!] {The task to remove}][
remove find head hipe-serv/threads o
]

And of course, or very simple threading engine. It works very simple. Upon invocation the function process-thread just picks the current-thread and tries to do its code-pointer field. If the code-pointer fails (an error has occured) the special field 'clean-up is fetched from the threading object.

The code in clean-up can then free any additional resources, and then the trhead is removed. If the code-pointer was set to clean-up by the thread itself, the code is executed and removed from the blocks.

process-thread: func [/local do-thread][
do-thread: none
;Premature return
if empty? hipe-serv/threads [return]
;Are we initialized (cumbersome, but yes)
if none? current-thread [current-thread: head hipe-serv/threads]
;Are we at the end of the queue
;Note that this is an entry condition!
if tail? current-thread [current-thread: head current-thread]
;What do we need to do
do-thread: pick current-thread 1
 
 
if object? do-thread [
either 'clean-up = do-thread/code-pointer [
error? try [do do-thread/clean-up]
remove-thread do-thread
][
if error? try [do get/any in do-thread do-thread/code-pointer][
if found? find first do-thread 'clean-up [
;do additional cleanup
error? try [do do-thread/clean-up]
]
remove-thread do-thread
]
]
]
 
if not tail? current-thread [current-thread: next current-thread]
 
]

Accepting connections

Now we need to add some logic to add and remove ports when a connection is accepted. First two functions for add and removing the port 'as-is':

port-q-delete: func [
{Removes a port from our port list.}
target [port!]
][
remove find port-q target
]
 
port-q-insert: func [
{Inserts a port into our port list.}
target [port!]
][
append port-q target
]

The function get-handler returns the handler function associated with a given server-port. It is used below.

get-handler: func [
{Returns the handler for a given server port}
p [port!]
][
return select server-map p
]

Remember in the beginning that we decided that we'd associate a bunch of data with a port and put that in an object as well? This is what the following function does, and then it stores the object. As you can see the object gets the port, the handler, a timestamp and an empty field called user-data that can be used for anything you'd like when writing handlers.

object-q-insert: func [
{Inserts a port and its corresponding object into the object queue.}
serv [port!] {The server port}
target [port!] {The connection}
/local o my-handler
][
my-handler: get-handler serv
append hipe-serv/object-q target
o: make object! [port: target handler: :my-handler user-data: none
lastaccess: now]
append hipe-serv/object-q o
]

If we can add, we want to delete it as well! Here it goes:

object-q-delete: func [
{Removes a port and its corresponding object from the object queue.}
target [port!]
][
remove/part find hipe-serv/object-q target 2
]

On a higher level we want to simply 'start' or 'stop' a client port. This is what start and stop do, by calling the above functions:

start: func [
{Initializes everything for a client connection on application level.}
serv [port!] {The server port}
conn [port!] {The connection port}
][
port-q-insert conn
object-q-insert serv conn
]
 
 
stop: func [
{cleans up after a client connection.}
conn [port!]
/local conn-object
][
port-q-delete conn
error? try [
conn-object: select hipe-serv/object-q conn
close conn-object/port
object-q-delete conn
]
]

So we are ready for our main 'initializing' code for connections. Here is what we do:

- Are we in restricted mode? If yes, then check and possibly drop the connection.

- If we continue: start the connection.

init-conn-port: func [
{Initializes everything on network level.}
serv [port!] {The server port}
conn [port!] {The connection}
][
either restrict [
either allow? conn/remote-ip [
start serv conn
return
][
close conn
return
]
][
; No restrictions
start serv conn
return
]
]

Monitoring timeouts

In the code above you saw that every initialized TCP connection has received a timestamp. We can use this timestamp to detect if a connection has been idle too long. If it has we may choose to drop it. The monitor function starts a thread that does just that. And it makes for a nice threading example as well!

monitor: func [
/interval t {the interval time}
/timeout t1 {The timeout time}
/local int
][
if timeout [hipe-serv/conn-timeout: t1]
 
int: either interval [t][0:0:5]
 
 
add-thread context [
code-pointer: 'start-monitor
interv: int
last-run: now
clean-up: []
 
start-monitor: [
set/any 'eee try [
if now > (last-run + interv) [
foreach [ p item] hipe-serv/object-q [
if now > (item/lastaccess + hipe-serv/conn-timeout) [
hipe-serv/stop item/port
]
]
self/last-run: now
]
]
]
]
]

The main loop

YES! We are ready for our main loop! If you made it to this point you are almost there. Just hold on a little bit longer...

Our first of two functions is process-ports. It is given a block of ports and processes the event on them. For every item inthe port block:

- If it is a server and itis an UDP port: copy the data and pass it to the handler.

- If it is a server and it is a TCP port, initialize it.

- If it is is a TCP connection port, get the associated object, update the timestamp and call the handler. We pass the object to the handler, so the handler has access to all the information.

process-ports: func [
{Processes all ports that have events.}
portz [block!] {The port list}
/local temp-obj
][
repeat item portz [
either (is-server? item) [
either item/scheme = 'udp [
;udp, so call our handler
temp-obj: get-handler item
temp-obj copy item
][
init-conn-port item first item
]
][
if item/scheme = 'tcp [
temp-obj: select hipe-serv/object-q item
temp-obj/lastaccess: now
temp-obj/handler temp-obj
]
]
]
]

And here is our main loop, called serve.

We wait on all currently known ports, both server and client with a timeout of 2 milliseconds. We then look at what we get back. If it is none, there has been no network event if these two milliseconds, so we choose to call process-thread.

Otherwise we have a block of ports with events and we call process-ports with that block. If there are mor threads waiting than allowed we start processing them anyway.

serve: func [
{Starts serving. Does a blocking wait until there are events.
Processes thread in the background as well!}
/local portz
][
forever [
portz: wait/all join port-q 0.002
either none? portz [
process-thread
][
process-ports portz
;If there are more than 100 threads, start processing anyway
if hipe-serv/max-thread-waiting < length? hipe-serv/threads [
process-thread
]
]
]
]
]

A sample

So now we have this great engine. Can we use it I hear you say! Of course! Please take a look at the following handler:

test-handler: func [o [object!]][
set-modes o/port [no-wait: false]
temp: copy/part o/port 8
insert o/port temp
hipe-serv/stop o/port
]

It first sets the port to blocking mode, for the sake of simplicity of the sample. If you don't do this, you must keep track of how much data you have read, what do to next and so on. If you want an example of that, just check out Rugby. It is nothing more than a non-blocking handler on top of hipe.

Back to our test-handler. It copies the first 8 bytes of whatever it is sent (but you must sent at least 8 bytes) and returns that. Then it stops the port.

Let's try:

do %hipe.r
hipe-server/init-server-port tcp://:9002 :test-handler
hipe-server/serve

Now fire up a second console to act as a client:

p: open tcp://localhost:9002
insert p "1234567891011"
print copy p
>> 12345678
close p

Nice? Now write your own webserver, broker, ....