Skip to main content

Sender, Receiver and the Reconnect Semaphore

Understanding how ChannelAdaptor manages its internal threads and the semaphores that coordinate them is essential before tackling QMUX, which builds directly on top of this machinery.

Two threads, one channel

When the adaptor starts it submits two tasks to an ExecutorService:

  • Sender — reads ISOMsg objects from the <in> Space queue and writes them to the channel.
  • Receiver — reads ISOMsg objects from the channel and writes them to the <out> Space queue.

Both share the same ISOChannel instance. Neither owns the connection: connection management is an emergent property of how they cooperate through two Space semaphores.

The two semaphores

ChannelAdaptor uses two internal Space entries as semaphores. Their keys are derived from the adaptor name and are not configurable:

SemaphoreKey patternPurpose
ready<name>.readySignals that the channel is connected and ready to receive
reconnect<name>.reconnectSignals that a reconnect cycle should begin

For an adaptor named upstream those become upstream.ready and upstream.reconnect.

Sender lifecycle

The Sender calls checkConnection() before it ever reads from the queue. checkConnection() is where the actual TCP connect happens:

// simplified
void checkConnection() {
while (running() && sp.rdp(reconnect) != null) {
ISOUtil.sleep(delay); // back-off while reconnect is pending
}
if (!channel.isConnected()) {
disconnect(); // ensure clean state
connect(); // blocks until connected
sp.out(ready, new Date()); // publish the ready semaphore
}
}

Three things to note:

  1. reconnect as a gate. If the reconnect semaphore is present in Space, checkConnection() will not attempt to connect — it sleeps and loops. This prevents the Sender and Receiver from racing to reconnect simultaneously.

  2. ready as a timestamp. Once connected, the Sender publishes new Date() onto upstream.ready. This is not just a boolean flag — the value records the connection time, which is useful for diagnostics. The Receiver (and QMUX) reads this semaphore to confirm the channel is live before sending.

  3. Write failure pushes a wakeup token. If channel.send() throws an IOException, the Sender puts Boolean.TRUE onto the reconnect queue (with a delay TTL) and then pushes a sentinel value back onto the in queue to wake itself up on the next iteration — so it can re-enter checkConnection() rather than blocking indefinitely on an empty queue.

Receiver lifecycle

The Receiver does not manage the connection at all. Its job is simply:

  1. Wait until upstream.ready is present.
  2. Loop on channel.receive(), pushing every ISOMsg it gets onto the out queue.
  3. If the channel breaks (EOF, socket timeout, or any IOException), signal the Sender to reconnect.

The signal is a two-step operation:

sp.out(reconnect, Boolean.TRUE, delay);  // (1) set the reconnect gate
sp.push(in, hashCode()); // (2) wake up the Sender

Step (1) puts an entry onto the reconnect queue with a TTL of delay milliseconds. The TTL is important: if the Sender is slow to pick it up, the entry will eventually expire and not permanently block future reconnects.

Step (2) pushes a sentinel onto the in queue. The Sender may be blocking on sp.in(in, delay) — the wakeup token unblocks it immediately so it can enter checkConnection() and detect the reconnect semaphore.

After triggering the reconnect, the Receiver clears the ready semaphore (by calling sp.inp(ready)) and goes back to waiting for it. This is the signal to QMUX (and any other consumer) that the channel is no longer usable for sending.

What happens during a reconnect

Putting it all together, a full disconnect/reconnect cycle looks like this:

Receiver: channel.receive() throws EOFException
Receiver: sp.out("upstream.reconnect", TRUE, 10000) ← gate is up, TTL=10s
Receiver: sp.push("upstream-send", wakeup-token) ← wake Sender
Receiver: sp.inp("upstream.ready") ← clear ready flag
Receiver: blocking on sp.rd("upstream.ready", 5000)

Sender: sp.in("upstream-send") returns wakeup-token ← woken up
Sender: enters checkConnection()
Sender: sp.rdp("upstream.reconnect") != null ← gate is up
Sender: ISOUtil.sleep(delay) ← back-off
Sender: ... reconnect TTL expires ...
Sender: sp.rdp("upstream.reconnect") == null ← gate is down
Sender: disconnect() + connect() ← TCP handshake
Sender: sp.out("upstream.ready", new Date()) ← signal ready

Receiver: sp.rd("upstream.ready") returns ← unblocked
Receiver: resumes channel.receive() loop

The reconnect semaphore's TTL acts as a minimum back-off: even if the Sender picks up the wakeup token instantly, it will not attempt to reconnect until delay milliseconds have elapsed. This prevents rapid reconnect storms when the remote host is down.

Optional elements that affect this machinery

<keep-alive>

<keep-alive>yes</keep-alive>

When enabled, the Sender will call BaseChannel.isConnected() even when the queue is idle and send a keep-alive probe if needed. This forces the OS to detect stale TCP connections that would otherwise appear connected indefinitely.

<write-only>

<write-only>yes</write-only>

Skips starting the Receiver entirely. Useful when the remote host only accepts messages and never sends responses (e.g. a one-way notification endpoint). The out queue is simply never populated.

<wait-for-workers-on-stop>

<wait-for-workers-on-stop>yes</wait-for-workers-on-stop>

When the adaptor is stopping, the Sender receives a Boolean.TRUE sentinel on the in queue as the stop signal. With wait-for-workers-on-stop, the adaptor waits for the Sender and Receiver threads to fully exit before completing the stop sequence. Useful when a clean in-flight drain is required.

soft-stop

<soft-stop>5000</soft-stop>

On stop, waits up to soft-stop milliseconds before disconnecting the channel. This allows any in-flight messages that have already been read from the in queue but not yet sent to complete before the TCP connection is torn down.

Why this matters for QMUX

QMUX (the multiplexer) sits above the ChannelAdaptor and uses the same Space queues and semaphores to function:

  • It sends requests by putting ISOMsg objects onto the in queue.
  • It reads responses from the out queue.
  • It uses the ready semaphore to determine whether the channel is available before it tries to send.
  • It respects the reconnect semaphore to avoid attempting sends while a reconnect is in progress.

Because QMUX interacts with the adaptor entirely through Space, it is completely decoupled from TCP connection management. The ChannelAdaptor can reconnect transparently without QMUX needing to be aware of the underlying socket lifecycle — as long as both sides honour the ready/reconnect semaphore contract described above.