❓ Frequently Asked Questions
Here’s some helpful information to get you started.
Which pools do you support?
We support the following platforms for both trading and data stream APIs:
- pump.fun
- pump-amm
- BONK
- Raydium Launchpad
- Raydium CPMM
How can I have more than 2 connections to the data stream?
We send a very large number of transactions, so to avoid overloading our server, there is a strict limit of 1 connection per IP address.
You can use ZeroMQ (highly recommended; it's extremely simple to use). The idea is straightforward:
You run a single instance that receives all transactions from our server. Then, from that instance, you share each transaction using the PUB/SUB method.
Here's how it works: You send data to a specific port, for example, 8939. Then the SUB (subscribers — your two, three, or more bots) connect to this port and receive all the transactions.
This way, you only have one connection for all three bots. ZeroMQ is currently the most efficient solution (latency is practically nonexistent — measured in nanoseconds). But you can also use WebSockets to stream locally (a bit slower — latency in microseconds). This method may be better because you only need to change one URL from "wss://stream.pumpapi.io" to "ws://127.0.0.1:9999":
Here's an example of the data stream sender (just run it and do not close it):
ZeroMQ method. Super fast. Requires a few changes on the bot side
- Python
- JavaScript
- Rust
- Go
import asyncio
import websockets
import zmq # pip install pyzmq
import sys
ctx = zmq.Context()
if sys.platform == "win32":
sock = ctx.socket(zmq.PUB)
sock.bind("tcp://127.0.0.1:8939")
else:
sock = ctx.socket(zmq.PUB)
sock.bind("ipc://8939") # On Linux, IPC is slightly faster than TCP
async def connect_pumpapi_stream():
while True:
try:
async with websockets.connect("wss://stream.pumpapi.io") as ws:
while True:
msg = await ws.recv()
sock.send(msg)
except Exception as e:
print(e)
await asyncio.sleep(1)
asyncio.run(connect_pumpapi_stream())
const WebSocket = require('ws');
const zmq = require('zeromq');
async function createStreamer() {
const sock = new zmq.Publisher();
if (process.platform === 'win32') {
await sock.bind('tcp://127.0.0.1:8939');
} else {
await sock.bind('ipc://8939'); // On Linux, IPC is slightly faster than TCP
}
async function connectPumpApiStream() {
while (true) {
try {
const ws = new WebSocket('wss://stream.pumpapi.io');
ws.on('message', async (msg) => {
await sock.send(msg);
});
ws.on('error', (error) => {
console.error(error);
});
ws.on('close', () => {
console.log('Connection closed, reconnecting...');
setTimeout(connectPumpApiStream, 1000);
});
// Wait for connection to close
await new Promise((resolve) => {
ws.on('close', resolve);
});
} catch (error) {
console.error(error);
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
connectPumpApiStream();
}
createStreamer();
use tokio_tungstenite::{connect_async, tungstenite::Message};
use zmq::{Context, Socket, PUB};
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let context = Context::new();
let sock = context.socket(PUB).unwrap();
if cfg!(target_os = "windows") {
sock.bind("tcp://127.0.0.1:8939").unwrap();
} else {
sock.bind("ipc://8939").unwrap(); // On Linux, IPC is slightly faster than TCP
}
connect_pumpapi_stream(sock).await;
}
async fn connect_pumpapi_stream(sock: Socket) {
loop {
match connect_async("wss://stream.pumpapi.io").await {
Ok((ws_stream, _)) => {
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Err(e) = sock.send(&text, 0) {
eprintln!("Error sending message: {}", e);
}
}
Ok(Message::Binary(data)) => {
if let Err(e) = sock.send(&data, 0) {
eprintln!("Error sending message: {}", e);
}
}
Err(e) => {
eprintln!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
Err(e) => {
eprintln!("Connection error: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
package main
import (
"context"
"fmt"
"log"
"runtime"
"time"
"github.com/gorilla/websocket"
"github.com/pebbe/zmq4"
)
func main() {
ctx, err := zmq4.NewContext()
if err != nil {
log.Fatal(err)
}
defer ctx.Term()
sock, err := ctx.NewSocket(zmq4.PUB)
if err != nil {
log.Fatal(err)
}
defer sock.Close()
if runtime.GOOS == "windows" {
err = sock.Bind("tcp://127.0.0.1:8939")
} else {
err = sock.Bind("ipc://8939") // On Linux, IPC is slightly faster than TCP
}
if err != nil {
log.Fatal(err)
}
connectPumpApiStream(sock)
}
func connectPumpApiStream(sock *zmq4.Socket) {
for {
conn, _, err := websocket.DefaultDialer.Dial("wss://stream.pumpapi.io", nil)
if err != nil {
fmt.Printf("Connection error: %v\n", err)
time.Sleep(1 * time.Second)
continue
}
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Printf("Read error: %v\n", err)
conn.Close()
break
}
_, err = sock.SendBytes(message, 0)
if err != nil {
fmt.Printf("Send error: %v\n", err)
}
}
time.Sleep(1 * time.Second)
}
}
- Python
- JavaScript
- Rust
- Go
Synchronous version:
import zmq # there's also asynchonios package zmq.asyncio
import sys
import orjson # or json
ctx = zmq.Context()
if sys.platform == "win32":
sock = ctx.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:8939")
else:
sock = ctx.socket(zmq.SUB)
sock.connect("ipc://8939") # On Linux, IPC is slightly faster than TCP
sock.setsockopt_string(zmq.SUBSCRIBE, "")
while True:
msg = sock.recv()
msg = orjson.loads(msg)
print(msg)
Asynchronous version:
import zmq.asyncio
import sys
import orjson
import asyncio
ctx = zmq.asyncio.Context()
if sys.platform == "win32":
sock = ctx.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:8939")
else:
sock = ctx.socket(zmq.SUB)
sock.connect("ipc://8939") # On Linux, IPC is slightly faster than TCP
sock.setsockopt_string(zmq.SUBSCRIBE, "")
async def runner():
while True:
msg = await sock.recv()
msg = orjson.loads(msg)
print(msg)
asyncio.run(runner())
Synchronous version:
const zmq = require('zeromq');
async function createClient() {
const sock = new zmq.Subscriber();
if (process.platform === 'win32') {
sock.connect('tcp://127.0.0.1:8939');
} else {
sock.connect('ipc://8939'); // On Linux, IPC is slightly faster than TCP
}
sock.subscribe(''); // Subscribe to all messages
for await (const [msg] of sock) {
const data = JSON.parse(msg.toString());
console.log(data);
}
}
createClient();
Promise-based version:
const zmq = require('zeromq');
async function createAsyncClient() {
const sock = new zmq.Subscriber();
if (process.platform === 'win32') {
sock.connect('tcp://127.0.0.1:8939');
} else {
sock.connect('ipc://8939'); // On Linux, IPC is slightly faster than TCP
}
sock.subscribe(''); // Subscribe to all messages
while (true) {
try {
const [msg] = await sock.receive();
const data = JSON.parse(msg.toString());
console.log(data);
} catch (error) {
console.error('Error receiving message:', error);
}
}
}
createAsyncClient();
Synchronous version:
use zmq::{Context, SUB};
use serde_json::Value;
fn main() {
let context = Context::new();
let sock = context.socket(SUB).unwrap();
if cfg!(target_os = "windows") {
sock.connect("tcp://127.0.0.1:8939").unwrap();
} else {
sock.connect("ipc://8939").unwrap(); // On Linux, IPC is slightly faster than TCP
}
sock.set_subscribe(b"").unwrap(); // Subscribe to all messages
loop {
let msg = sock.recv_bytes(0).unwrap();
let data: Value = serde_json::from_slice(&msg).unwrap();
println!("{}", data);
}
}
Asynchronous version:
use tokio_zmq::{prelude::*, Sub};
use serde_json::Value;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let mut sock = Sub::new().unwrap();
if cfg!(target_os = "windows") {
sock.connect("tcp://127.0.0.1:8939").unwrap();
} else {
sock.connect("ipc://8939").unwrap(); // On Linux, IPC is slightly faster than TCP
}
sock.set_subscribe("").unwrap(); // Subscribe to all messages
let mut stream = sock.stream();
while let Some(msg) = stream.next().await {
match msg {
Ok(multipart) => {
if let Some(data) = multipart.get(0) {
match serde_json::from_slice::<Value>(data) {
Ok(parsed) => println!("{}", parsed),
Err(e) => eprintln!("JSON parse error: {}", e),
}
}
}
Err(e) => eprintln!("Receive error: {}", e),
}
}
}
Synchronous version:
package main
import (
"encoding/json"
"fmt"
"log"
"runtime"
"github.com/pebbe/zmq4"
)
func main() {
ctx, err := zmq4.NewContext()
if err != nil {
log.Fatal(err)
}
defer ctx.Term()
sock, err := ctx.NewSocket(zmq4.SUB)
if err != nil {
log.Fatal(err)
}
defer sock.Close()
if runtime.GOOS == "windows" {
err = sock.Connect("tcp://127.0.0.1:8939")
} else {
err = sock.Connect("ipc://8939") // On Linux, IPC is slightly faster than TCP
}
if err != nil {
log.Fatal(err)
}
err = sock.SetSubscribe("") // Subscribe to all messages
if err != nil {
log.Fatal(err)
}
for {
msg, err := sock.RecvBytes(0)
if err != nil {
log.Printf("Receive error: %v", err)
continue
}
var data interface{}
if err := json.Unmarshal(msg, &data); err != nil {
log.Printf("JSON parse error: %v", err)
continue
}
fmt.Println(data)
}
}
Goroutine-based version:
package main
import (
"encoding/json"
"fmt"
"log"
"runtime"
"time"
"github.com/pebbe/zmq4"
)
func main() {
ctx, err := zmq4.NewContext()
if err != nil {
log.Fatal(err)
}
defer ctx.Term()
sock, err := ctx.NewSocket(zmq4.SUB)
if err != nil {
log.Fatal(err)
}
defer sock.Close()
if runtime.GOOS == "windows" {
err = sock.Connect("tcp://127.0.0.1:8939")
} else {
err = sock.Connect("ipc://8939") // On Linux, IPC is slightly faster than TCP
}
if err != nil {
log.Fatal(err)
}
err = sock.SetSubscribe("") // Subscribe to all messages
if err != nil {
log.Fatal(err)
}
// Message processing goroutine
go func() {
for {
msg, err := sock.RecvBytes(0)
if err != nil {
log.Printf("Receive error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
var data interface{}
if err := json.Unmarshal(msg, &data); err != nil {
log.Printf("JSON parse error: %v", err)
continue
}
fmt.Println(data)
}
}()
// Keep main goroutine alive
select {}
}
WebSockets method. Fast. No changes required. You just need to change the link on the bot side to ws://127.0.0.1:9999
.
- Python
- JavaScript
- Rust
- Go
import asyncio
import websockets
clients = set()
async def client_handler(websocket):
clients.add(websocket)
try:
await websocket.wait_closed()
finally:
clients.remove(websocket)
async def relay():
print("Relay started.")
while True:
try:
async with websockets.connect("wss://stream.pumpapi.io") as ws:
async for msg in ws:
for client in list(clients):
asyncio.create_task(client.send(msg))
except Exception as e:
print(e)
await asyncio.sleep(0.2)
print("Reconnecting...")
async def main():
server = await websockets.serve(client_handler, "localhost", 9999)
await relay()
asyncio.run(main())
const WebSocket = require('ws');
const clients = new Set();
const server = new WebSocket.Server({ port: 9999, host: 'localhost' });
server.on('connection', (ws) => {
clients.add(ws);
ws.on('close', () => {
clients.delete(ws);
});
});
async function relay() {
console.log('Relay started.');
while (true) {
try {
const ws = new WebSocket('wss://stream.pumpapi.io');
ws.on('message', (msg) => {
for (const client of clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(msg);
}
}
});
await new Promise((resolve, reject) => {
ws.on('close', resolve);
ws.on('error', reject);
});
} catch (e) {
console.log(e);
await new Promise(resolve => setTimeout(resolve, 200));
console.log('Reconnecting...');
}
}
}
relay();
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::accept_async;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
type Clients = Arc<Mutex<HashSet<tokio_tungstenite::WebSocketStream<TcpStream>>>>;
#[tokio::main]
async fn main() {
let clients: Clients = Arc::new(Mutex::new(HashSet::new()));
let clients_clone = Arc::clone(&clients);
tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:9999").await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
let clients = Arc::clone(&clients_clone);
tokio::spawn(async move {
if let Ok(ws_stream) = accept_async(stream).await {
{
let mut clients_lock = clients.lock().await;
clients_lock.insert(ws_stream);
}
}
});
}
});
relay(clients).await;
}
async fn relay(clients: Clients) {
println!("Relay started.");
loop {
match connect_async("wss://stream.pumpapi.io").await {
Ok((ws_stream, _)) => {
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
let mut clients_lock = clients.lock().await;
for client in clients_lock.iter_mut() {
let _ = client.send(Message::Text(text.clone())).await;
}
}
Ok(Message::Binary(data)) => {
let mut clients_lock = clients.lock().await;
for client in clients_lock.iter_mut() {
let _ = client.send(Message::Binary(data.clone())).await;
}
}
Err(e) => {
println!("{}", e);
break;
}
_ => {}
}
}
}
Err(e) => {
println!("{}", e);
tokio::time::sleep(Duration::from_millis(200)).await;
println!("Reconnecting...");
}
}
}
}
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var clients = make(map[*websocket.Conn]bool)
var clientsMutex sync.Mutex
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func handleClient(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
clientsMutex.Lock()
clients[conn] = true
clientsMutex.Unlock()
defer func() {
clientsMutex.Lock()
delete(clients, conn)
clientsMutex.Unlock()
conn.Close()
}()
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
}
func relay() {
fmt.Println("Relay started.")
for {
conn, _, err := websocket.DefaultDialer.Dial("wss://stream.pumpapi.io", nil)
if err != nil {
fmt.Println(err)
time.Sleep(200 * time.Millisecond)
fmt.Println("Reconnecting...")
continue
}
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println(err)
conn.Close()
break
}
clientsMutex.Lock()
for client := range clients {
go func(c *websocket.Conn) {
c.WriteMessage(websocket.TextMessage, message)
}(client)
}
clientsMutex.Unlock()
}
time.Sleep(200 * time.Millisecond)
fmt.Println("Reconnecting...")
}
}
func main() {
http.HandleFunc("/", handleClient)
go func() {
log.Fatal(http.ListenAndServe(":9999", nil))
}()
relay()
}
Why don't you include the Bonding Curve address in the Data Stream?
We do not send the Bonding Curve in order to optimize server communication. (The less data we send, the faster you receive the message.) Additionally, you probably don’t need the Bonding Curve in most cases. However, if you do need it, we have great news: the Bonding Curve address is actually derived from the token address itself. So you can easily compute the Bonding Curve address offline in a very cheap and computationally efficient way:
- Python
- JavaScript
- Rust
- Go
from solders.pubkey import Pubkey
def get_bonding_curve_from_mint_pump_fun(mint):
mint = Pubkey.from_string(mint)
PUMP_FUN_PROGRAM = Pubkey.from_string("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P")
return str(Pubkey.find_program_address([b"bonding-curve", bytes(mint)], PUMP_FUN_PROGRAM)[0])
def get_pool_from_mint_bonk(mint):
mint = Pubkey.from_string(mint)
LAUNCHPOOL_PROGRAM = Pubkey.from_string("LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj")
wsol = Pubkey.from_string("So11111111111111111111111111111111111111112")
return str(Pubkey.find_program_address([b"pool", bytes(mint), bytes(wsol)], LAUNCHPOOL_PROGRAM)[0])
bonk_pool = get_pool_from_mint_bonk('376CSPnY36SY8Xpz33uKheGQCQeSmt7bVZdYQzspbonk') # BONK_TOKEN_ADDRESS_HERE
print(bonk_pool)
pump_fun_bonding_curve_address = get_bonding_curve_from_mint_pump_fun('WncP229sVH4WmziV2k4cgHwGxXN72bCstPNcADvpump') # PUMP_FUN_TOKEN_ADDRESS_HERE
print(pump_fun_bonding_curve_address)
const { PublicKey } = require('@solana/web3.js');
function getBondingCurveFromMintPumpFun(mint) {
const mintPubkey = new PublicKey(mint);
const PUMP_FUN_PROGRAM = new PublicKey("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P");
const [pda] = PublicKey.findProgramAddressSync(
[Buffer.from("bonding-curve"), mintPubkey.toBuffer()],
PUMP_FUN_PROGRAM
);
return pda.toString();
}
function getPoolFromMintBonk(mint) {
const mintPubkey = new PublicKey(mint);
const LAUNCHPOOL_PROGRAM = new PublicKey("LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj");
const wsol = new PublicKey("So11111111111111111111111111111111111111112");
const [pda] = PublicKey.findProgramAddressSync(
[Buffer.from("pool"), mintPubkey.toBuffer(), wsol.toBuffer()],
LAUNCHPOOL_PROGRAM
);
return pda.toString();
}
const bonkPool = getPoolFromMintBonk('376CSPnY36SY8Xpz33uKheGQCQeSmt7bVZdYQzspbonk'); // BONK_TOKEN_ADDRESS_HERE
console.log(bonkPool);
const pumpFunBondingCurveAddress = getBondingCurveFromMintPumpFun('WncP229sVH4WmziV2k4cgHwGxXN72bCstPNcADvpump'); // PUMP_FUN_TOKEN_ADDRESS_HERE
console.log(pumpFunBondingCurveAddress);
use solana_program::pubkey::Pubkey;
use std::str::FromStr;
fn get_bonding_curve_from_mint_pump_fun(mint: &str) -> String {
let mint_pubkey = Pubkey::from_str(mint).unwrap();
let pump_fun_program = Pubkey::from_str("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P").unwrap();
let (pda, _) = Pubkey::find_program_address(
&[b"bonding-curve", mint_pubkey.as_ref()],
&pump_fun_program
);
pda.to_string()
}
fn get_pool_from_mint_bonk(mint: &str) -> String {
let mint_pubkey = Pubkey::from_str(mint).unwrap();
let launchpool_program = Pubkey::from_str("LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj").unwrap();
let wsol = Pubkey::from_str("So11111111111111111111111111111111111111112").unwrap();
let (pda, _) = Pubkey::find_program_address(
&[b"pool", mint_pubkey.as_ref(), wsol.as_ref()],
&launchpool_program
);
pda.to_string()
}
fn main() {
let bonk_pool = get_pool_from_mint_bonk("376CSPnY36SY8Xpz33uKheGQCQeSmt7bVZdYQzspbonk"); // BONK_TOKEN_ADDRESS_HERE
println!("{}", bonk_pool);
let pump_fun_bonding_curve_address = get_bonding_curve_from_mint_pump_fun("WncP229sVH4WmziV2k4cgHwGxXN72bCstPNcADvpump"); // PUMP_FUN_TOKEN_ADDRESS_HERE
println!("{}", pump_fun_bonding_curve_address);
}
package main
import (
"crypto/sha256"
"fmt"
"github.com/gagliardetto/solana-go"
)
func getBondingCurveFromMintPumpFun(mint string) string {
mintPubkey := solana.MustPublicKeyFromBase58(mint)
pumpFunProgram := solana.MustPublicKeyFromBase58("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P")
pda, _, _ := solana.FindProgramAddress(
[][]byte{
[]byte("bonding-curve"),
mintPubkey.Bytes(),
},
pumpFunProgram,
)
return pda.String()
}
func getPoolFromMintBonk(mint string) string {
mintPubkey := solana.MustPublicKeyFromBase58(mint)
launchpoolProgram := solana.MustPublicKeyFromBase58("LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj")
wsol := solana.MustPublicKeyFromBase58("So11111111111111111111111111111111111111112")
pda, _, _ := solana.FindProgramAddress(
[][]byte{
[]byte("pool"),
mintPubkey.Bytes(),
wsol.Bytes(),
},
launchpoolProgram,
)
return pda.String()
}
func main() {
bonkPool := getPoolFromMintBonk("376CSPnY36SY8Xpz33uKheGQCQeSmt7bVZdYQzspbonk") // BONK_TOKEN_ADDRESS_HERE
fmt.Println(bonkPool)
pumpFunBondingCurveAddress := getBondingCurveFromMintPumpFun("WncP229sVH4WmziV2k4cgHwGxXN72bCstPNcADvpump") // PUMP_FUN_TOKEN_ADDRESS_HERE
fmt.Println(pumpFunBondingCurveAddress)
}
Why do you send all transactions through the Data Stream? Can my computer handle that?
Some users worry that we're sending too many transactions — but in reality, it’s not as much as it might sound. We send around 90–150 transactions per second, covering all events across all pools, and each transaction is very small in size.
Even older processors can easily handle this load. In fact, most modern servers, desktops, and laptops would be capable of processing over 30,000 of these lightweight transactions per second without any special optimization. So there's no need to worry — your machine can handle it just fine.
We designed it this way to ensure faster delivery and greater flexibility. Instead of requiring you to query us for updates or specific token transfers, you receive everything in real time — instantly, and with much less complexity on your side.
Where is your server located? I want the lowest possible latency!
Our server is located in New York, close to the Solana RPC infrastructure. This helps ensure the lowest possible latency for trading and transaction processing.
If you're running infrastructure or bots, we recommend hosting them as close to New York as possible for the best performance.
Even if you're not in New York — or your server isn't — don’t worry! Thanks to efficient message delivery and lightweight transactions, performance will still be very fast and more than sufficient for most use cases.
Is it true that BONK has surpassed pump.fun in transaction volume and is rapidly gaining popularity? Should I switch to BONK?
Yes! That’s absolutely true. We are the only provider offering a Bonk + Raydium CPMM transaction Data Stream, and we can confidently say that Pump.Fun's dominance has significantly decreased. At the time of writing, BONK has more transactions than Pump.Fun (approximately 60% vs 40%). You should definitely try our Trade API + Data Stream to cover not just Pump.Fun but BONK as well!
Have a question? Ask us in our Telegram group