We send a very large number of transactions, so to avoid overloading our server, there is a strict limit of 1 connection per IP address.
You can use ZeroMQ (highly recommended; it's extremely simple to use). The idea is straightforward:
You run a single instance that receives all transactions from our server. Then, from that instance, you share each transaction using the PUB/SUB method.
Here's how it works:
You send data to a specific port, for example, 8939.
Then the SUB (subscribers — your two, three, or more bots) connect to this port and receive all the transactions.
This way, you only have one connection for all three bots.
ZeroMQ is currently the most efficient solution (latency is practically nonexistent — measured in nanoseconds). But you can also use WebSockets to stream locally (a bit slower — latency in microseconds). This method may be better because you only need to change one URL from "wss://stream.pumpapi.io" to "ws://127.0.0.1:9999":
Here's an example of the data stream sender (just run it and do not close it):ZeroMQ method. Super fast. Requires a few changes on the bot side
import asyncio
import websockets
import zmq
import sys
ctx = zmq.Context()
if sys.platform == "win32":
sock = ctx.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:8939")
else:
sock = ctx.socket(zmq.PUB)
sock.bind("ipc://8939")
async def connect_pumpapi_stream():
while True:
try:
async with websockets.connect("wss://stream.pumpapi.io") as ws:
while True:
msg = await ws.recv()
sock.send(msg)
except Exception as e:
print(e)
await asyncio.sleep(1)
asyncio.run(connect_pumpapi_stream())
const WebSocket = require('ws');
const zmq = require('zeromq');
async function createStreamer() {
const sock = new zmq.Publisher();
if (process.platform === 'win32') {
await sock.bind('tcp://127.0.0.1:8939');
} else {
await sock.bind('ipc://8939');
}
async function connectPumpApiStream() {
while (true) {
try {
const ws = new WebSocket('wss://stream.pumpapi.io');
ws.on('message', async (msg) => {
await sock.send(msg);
});
ws.on('error', (error) => {
console.error(error);
});
ws.on('close', () => {
console.log('Connection closed, reconnecting...');
});
await new Promise((resolve) => {
ws.on('close', resolve);
});
await new Promise(resolve => setTimeout(resolve, 1000));
} catch (error) {
console.error(error);
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
connectPumpApiStream();
}
createStreamer();
use tokio_tungstenite::{connect_async, tungstenite::Message};
use zmq::{Context, Socket, PUB};
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let context = Context::new();
let sock = context.socket(PUB).unwrap();
if cfg!(target_os = "windows") {
sock.bind("tcp://127.0.0.1:8939").unwrap();
} else {
sock.bind("ipc://8939").unwrap();
}
connect_pumpapi_stream(sock).await;
}
async fn connect_pumpapi_stream(sock: Socket) {
loop {
match connect_async("wss://stream.pumpapi.io").await {
Ok((ws_stream, _)) => {
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Err(e) = sock.send(&text, 0) {
eprintln!("Error sending message: {}", e);
}
}
Ok(Message::Binary(data)) => {
if let Err(e) = sock.send(&data, 0) {
eprintln!("Error sending message: {}", e);
}
}
Err(e) => {
eprintln!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
Err(e) => {
eprintln!("Connection error: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
package main
import (
"context"
"fmt"
"log"
"runtime"
"time"
"github.com/gorilla/websocket"
"github.com/pebbe/zmq4"
)
func main() {
ctx, err := zmq4.NewContext()
if err != nil {
log.Fatal(err)
}
defer ctx.Term()
sock, err := ctx.NewSocket(zmq4.PUB)
if err != nil {
log.Fatal(err)
}
defer sock.Close()
if runtime.GOOS == "windows" {
err = sock.Bind("tcp://127.0.0.1:8939")
} else {
err = sock.Bind("ipc://8939")
}
if err != nil {
log.Fatal(err)
}
connectPumpApiStream(sock)
}
func connectPumpApiStream(sock *zmq4.Socket) {
for {
conn, _, err := websocket.DefaultDialer.Dial("wss://stream.pumpapi.io", nil)
if err != nil {
fmt.Printf("Connection error: %v\n", err)
time.Sleep(1 * time.Second)
continue
}
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Printf("Read error: %v\n", err)
conn.Close()
break
}
_, err = sock.SendBytes(message, 0)
if err != nil {
fmt.Printf("Send error: %v\n", err)
}
}
time.Sleep(1 * time.Second)
}
}
Here's an example of your bot that receives transactions (the getter):Synchronous version:
import zmq
import sys
import orjson
ctx = zmq.Context()
if sys.platform == "win32":
sock = ctx.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:8939")
else:
sock = ctx.socket(zmq.SUB)
sock.connect("ipc://8939")
sock.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
msg = sock.recv()
msg = orjson.loads(msg)
print(msg)
Asynchronous version:
import zmq.asyncio
import sys
import orjson
import asyncio
ctx = zmq.asyncio.Context()
if sys.platform == "win32":
sock = ctx.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:8939")
else:
sock = ctx.socket(zmq.SUB)
sock.connect("ipc://8939")
sock.setsockopt_string(zmq.SUBSCRIBE, "")
async def runner():
while True:
msg = await sock.recv()
msg = orjson.loads(msg)
print(msg)
asyncio.run(runner())
Synchronous version:
const zmq = require('zeromq');
async function createClient() {
const sock = new zmq.Subscriber();
if (process.platform === 'win32') {
sock.connect('tcp://127.0.0.1:8939');
} else {
sock.connect('ipc://8939');
}
sock.subscribe('');
for await (const [msg] of sock) {
const data = JSON.parse(msg.toString());
console.log(data);
}
}
createClient();
Promise-based version:
const zmq = require('zeromq');
async function createAsyncClient() {
const sock = new zmq.Subscriber();
if (process.platform === 'win32') {
sock.connect('tcp://127.0.0.1:8939');
} else {
sock.connect('ipc://8939');
}
sock.subscribe('');
while (true) {
try {
const [msg] = await sock.receive();
const data = JSON.parse(msg.toString());
console.log(data);
} catch (error) {
console.error('Error receiving message:', error);
}
}
}
createAsyncClient();
Synchronous version:
use zmq::{Context, SUB};
use serde_json::Value;
fn main() {
let context = Context::new();
let sock = context.socket(SUB).unwrap();
if cfg!(target_os = "windows") {
sock.connect("tcp://127.0.0.1:8939").unwrap();
} else {
sock.connect("ipc://8939").unwrap();
}
sock.set_subscribe(b"").unwrap();
loop {
let msg = sock.recv_bytes(0).unwrap();
let data: Value = serde_json::from_slice(&msg).unwrap();
println!("{}", data);
}
}
Asynchronous version:
use tokio_zmq::{prelude::*, Sub};
use serde_json::Value;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let mut sock = Sub::new().unwrap();
if cfg!(target_os = "windows") {
sock.connect("tcp://127.0.0.1:8939").unwrap();
} else {
sock.connect("ipc://8939").unwrap();
}
sock.set_subscribe("").unwrap();
let mut stream = sock.stream();
while let Some(msg) = stream.next().await {
match msg {
Ok(multipart) => {
if let Some(data) = multipart.get(0) {
match serde_json::from_slice::<Value>(data) {
Ok(parsed) => println!("{}", parsed),
Err(e) => eprintln!("JSON parse error: {}", e),
}
}
}
Err(e) => eprintln!("Receive error: {}", e),
}
}
}
Synchronous version:
package main
import (
"encoding/json"
"fmt"
"log"
"runtime"
"github.com/pebbe/zmq4"
)
func main() {
ctx, err := zmq4.NewContext()
if err != nil {
log.Fatal(err)
}
defer ctx.Term()
sock, err := ctx.NewSocket(zmq4.SUB)
if err != nil {
log.Fatal(err)
}
defer sock.Close()
if runtime.GOOS == "windows" {
err = sock.Connect("tcp://127.0.0.1:8939")
} else {
err = sock.Connect("ipc://8939")
}
if err != nil {
log.Fatal(err)
}
err = sock.SetSubscribe("")
if err != nil {
log.Fatal(err)
}
for {
msg, err := sock.RecvBytes(0)
if err != nil {
log.Printf("Receive error: %v", err)
continue
}
var data interface{}
if err := json.Unmarshal(msg, &data); err != nil {
log.Printf("JSON parse error: %v", err)
continue
}
fmt.Println(data)
}
}
Goroutine-based version:
package main
import (
"encoding/json"
"fmt"
"log"
"runtime"
"time"
"github.com/pebbe/zmq4"
)
func main() {
ctx, err := zmq4.NewContext()
if err != nil {
log.Fatal(err)
}
defer ctx.Term()
sock, err := ctx.NewSocket(zmq4.SUB)
if err != nil {
log.Fatal(err)
}
defer sock.Close()
if runtime.GOOS == "windows" {
err = sock.Connect("tcp://127.0.0.1:8939")
} else {
err = sock.Connect("ipc://8939")
}
if err != nil {
log.Fatal(err)
}
err = sock.SetSubscribe("")
if err != nil {
log.Fatal(err)
}
go func() {
for {
msg, err := sock.RecvBytes(0)
if err != nil {
log.Printf("Receive error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
var data interface{}
if err := json.Unmarshal(msg, &data); err != nil {
log.Printf("JSON parse error: %v", err)
continue
}
fmt.Println(data)
}
}()
select {}
}
WebSockets method. Fast. No changes required. You just need to change the link on the bot side to ws://127.0.0.1:9999.
import asyncio
import websockets
clients = set()
async def client_handler(websocket):
clients.add(websocket)
try:
await websocket.wait_closed()
finally:
clients.remove(websocket)
async def relay():
print("Relay started.")
while True:
try:
async with websockets.connect("wss://stream.pumpapi.io") as ws:
async for msg in ws:
for client in list(clients):
asyncio.create_task(client.send(msg))
except Exception as e:
print(e)
await asyncio.sleep(0.2)
print("Reconnecting...")
async def main():
server = await websockets.serve(client_handler, "localhost", 9999)
await relay()
asyncio.run(main())
const WebSocket = require('ws');
const clients = new Set();
const server = new WebSocket.Server({ port: 9999, host: 'localhost' });
server.on('connection', (ws) => {
clients.add(ws);
ws.on('close', () => {
clients.delete(ws);
});
});
async function relay() {
console.log('Relay started.');
while (true) {
try {
const ws = new WebSocket('wss://stream.pumpapi.io');
ws.on('message', (msg) => {
for (const client of clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(msg);
}
}
});
await new Promise((resolve, reject) => {
ws.on('close', resolve);
ws.on('error', reject);
});
} catch (e) {
console.log(e);
await new Promise(resolve => setTimeout(resolve, 200));
console.log('Reconnecting...');
}
}
}
relay();
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::accept_async;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
type Clients = Arc<Mutex<HashSet<tokio_tungstenite::WebSocketStream<TcpStream>>>>;
#[tokio::main]
async fn main() {
let clients: Clients = Arc::new(Mutex::new(HashSet::new()));
let clients_clone = Arc::clone(&clients);
tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:9999").await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
let clients = Arc::clone(&clients_clone);
tokio::spawn(async move {
if let Ok(ws_stream) = accept_async(stream).await {
{
let mut clients_lock = clients.lock().await;
clients_lock.insert(ws_stream);
}
}
});
}
});
relay(clients).await;
}
async fn relay(clients: Clients) {
println!("Relay started.");
loop {
match connect_async("wss://stream.pumpapi.io").await {
Ok((ws_stream, _)) => {
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
let mut clients_lock = clients.lock().await;
for client in clients_lock.iter_mut() {
let _ = client.send(Message::Text(text.clone())).await;
}
}
Ok(Message::Binary(data)) => {
let mut clients_lock = clients.lock().await;
for client in clients_lock.iter_mut() {
let _ = client.send(Message::Binary(data.clone())).await;
}
}
Err(e) => {
println!("{}", e);
break;
}
_ => {}
}
}
}
Err(e) => {
println!("{}", e);
tokio::time::sleep(Duration::from_millis(200)).await;
println!("Reconnecting...");
}
}
}
}
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var clients = make(map[*websocket.Conn]bool)
var clientsMutex sync.Mutex
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func handleClient(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
clientsMutex.Lock()
clients[conn] = true
clientsMutex.Unlock()
defer func() {
clientsMutex.Lock()
delete(clients, conn)
clientsMutex.Unlock()
conn.Close()
}()
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
}
func relay() {
fmt.Println("Relay started.")
for {
conn, _, err := websocket.DefaultDialer.Dial("wss://stream.pumpapi.io", nil)
if err != nil {
fmt.Println(err)
time.Sleep(200 * time.Millisecond)
fmt.Println("Reconnecting...")
continue
}
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println(err)
conn.Close()
break
}
clientsMutex.Lock()
for client := range clients {
go func(c *websocket.Conn) {
c.WriteMessage(websocket.TextMessage, message)
}(client)
}
clientsMutex.Unlock()
}
time.Sleep(200 * time.Millisecond)
fmt.Println("Reconnecting...")
}
}
func main() {
http.HandleFunc("/", handleClient)
go func() {
log.Fatal(http.ListenAndServe(":9999", nil))
}()
relay()
}