From 8553c710f20283caa47b113056e3f70ce1ed64e8 Mon Sep 17 00:00:00 2001 From: jzitnik-dev Date: Fri, 17 Apr 2026 09:49:34 +0200 Subject: [PATCH] initial commit --- .env.example | 10 + .gitignore | 2 + README.md | 0 bot/.dockerignore | 5 + bot/.gitignore | 6 + bot/Cargo.toml | 22 ++ bot/Dockerfile | 25 ++ bot/src/config.rs | 56 ++++ bot/src/main.rs | 25 ++ bot/src/matrix/client.rs | 532 +++++++++++++++++++++++++++++++++++++ bot/src/matrix/mod.rs | 1 + bot/src/omegle/client.rs | 106 ++++++++ bot/src/omegle/mod.rs | 17 ++ bot/src/omegle/protocol.rs | 15 ++ bot/src/state/db.rs | 113 ++++++++ bot/src/state/mod.rs | 2 + bot/src/state/models.rs | 15 ++ bot/src/utils/flags.rs | 13 + bot/src/utils/mod.rs | 1 + docker-compose.yaml | 52 ++++ proxy/Dockerfile | 9 + proxy/main.py | 182 +++++++++++++ proxy/start.sh | 3 + 23 files changed, 1212 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 bot/.dockerignore create mode 100644 bot/.gitignore create mode 100644 bot/Cargo.toml create mode 100644 bot/Dockerfile create mode 100644 bot/src/config.rs create mode 100644 bot/src/main.rs create mode 100644 bot/src/matrix/client.rs create mode 100644 bot/src/matrix/mod.rs create mode 100644 bot/src/omegle/client.rs create mode 100644 bot/src/omegle/mod.rs create mode 100644 bot/src/omegle/protocol.rs create mode 100644 bot/src/state/db.rs create mode 100644 bot/src/state/mod.rs create mode 100644 bot/src/state/models.rs create mode 100644 bot/src/utils/flags.rs create mode 100644 bot/src/utils/mod.rs create mode 100644 docker-compose.yaml create mode 100644 proxy/Dockerfile create mode 100644 proxy/main.py create mode 100755 proxy/start.sh diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..13bde7e --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +# Matrix credentials +MATRIX_HOMESERVER=https://matrix.jzitnik.dev +MATRIX_USERNAME=@omeglebot:jzitnik.dev +MATRIX_PASSWORD= + +# Selenium Grid configuration +GRID_DASHBOARD_PORT=4444 # Bare in mind that by default there is no password for the VNC connection. So this shouldn't be exposed to the public. + +# Miscellaneous settings +CF_WAIT_TIME=60 # The time you have to manually pass the Cloudflare challenge in seconds. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3f22cb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +bot-data/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/bot/.dockerignore b/bot/.dockerignore new file mode 100644 index 0000000..251afbb --- /dev/null +++ b/bot/.dockerignore @@ -0,0 +1,5 @@ +target/ +.git/ +*.db +*.db-journal +.env \ No newline at end of file diff --git a/bot/.gitignore b/bot/.gitignore new file mode 100644 index 0000000..d25c92c --- /dev/null +++ b/bot/.gitignore @@ -0,0 +1,6 @@ +/target +Cargo.lock +*.log +.env +config.toml +*.db diff --git a/bot/Cargo.toml b/bot/Cargo.toml new file mode 100644 index 0000000..e3a6733 --- /dev/null +++ b/bot/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "omegle-matrix-client" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = { version = "0.21", features = ["native-tls"] } +futures-util = "0.3" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +rusqlite = { version = "0.30", features = ["bundled"] } +toml = "0.8" +reqwest = { version = "0.12", features = ["json"] } +matrix-sdk = { version = "0.7", features = ["e2e-encryption"] } +anyhow = "1" +tracing = "0.1" +tracing-subscriber = "0.3" +url = "2" +log = "0.4" +async-trait = "0.1" +dashmap = "5" diff --git a/bot/Dockerfile b/bot/Dockerfile new file mode 100644 index 0000000..dd9b10d --- /dev/null +++ b/bot/Dockerfile @@ -0,0 +1,25 @@ +FROM rust:1.85-bookworm AS builder + +WORKDIR /build + +RUN apt-get update && apt-get install -y pkg-config libssl-dev + +COPY Cargo.toml ./ +COPY src ./src + +RUN cargo build --release + +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y libssl3 ca-certificates && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY --from=builder /build/target/release/omegle-matrix-client /app/ +COPY config.toml /app/config.toml + +RUN mkdir -p /data && chown -R 1000:1000 /data + +ENV RUST_LOG=info + +ENTRYPOINT ["/app/omegle-matrix-client"] diff --git a/bot/src/config.rs b/bot/src/config.rs new file mode 100644 index 0000000..c9a8250 --- /dev/null +++ b/bot/src/config.rs @@ -0,0 +1,56 @@ +use anyhow::Result; +use serde::Deserialize; +use std::env; +use std::fs; + +#[derive(Deserialize, Clone)] +pub struct Config { + #[serde(default)] + pub matrix: MatrixConfig, + #[serde(default)] + pub omegle: OmegleConfig, +} + +#[derive(Deserialize, Clone, Default)] +pub struct MatrixConfig { + #[serde(default)] + pub homeserver: String, + #[serde(default)] + pub username: String, + #[serde(default)] + pub password: String, +} + +#[derive(Deserialize, Clone, Default)] +pub struct OmegleConfig { + #[serde(default)] + pub websocket_url: String, +} + +impl Config { + pub fn load(path: &str) -> Result { + let homeserver = env::var("MATRIX_HOMESERVER").unwrap_or_default(); + let username = env::var("MATRIX_USERNAME").unwrap_or_default(); + let password = env::var("MATRIX_PASSWORD").unwrap_or_default(); + let websocket_url = env::var("OMEGLE_WEBSOCKET_URL").unwrap_or_default(); + + if !homeserver.is_empty() + || !username.is_empty() + || !password.is_empty() + || !websocket_url.is_empty() + { + return Ok(Config { + matrix: MatrixConfig { + homeserver, + username, + password, + }, + omegle: OmegleConfig { websocket_url }, + }); + } + + let content = fs::read_to_string(path)?; + let config: Config = toml::from_str(&content)?; + Ok(config) + } +} diff --git a/bot/src/main.rs b/bot/src/main.rs new file mode 100644 index 0000000..9345e69 --- /dev/null +++ b/bot/src/main.rs @@ -0,0 +1,25 @@ +mod config; +mod matrix; +mod omegle; +mod state; +mod utils; + +use anyhow::Result; +use std::sync::Arc; +use crate::config::Config; +use crate::state::db::Db; +use crate::matrix::client::MatrixBot; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + + let config = Config::load("config.toml")?; + let db_path = std::env::var("DB_PATH").unwrap_or_else(|_| "omegle.db".to_string()); + let db = Arc::new(Db::new(&db_path)?); + + let bot = MatrixBot::new(config, db).await?; + bot.run().await?; + + Ok(()) +} diff --git a/bot/src/matrix/client.rs b/bot/src/matrix/client.rs new file mode 100644 index 0000000..4864ca7 --- /dev/null +++ b/bot/src/matrix/client.rs @@ -0,0 +1,532 @@ +use anyhow::Result; +use matrix_sdk::{ + config::SyncSettings, + Client, + Room, +}; +use matrix_sdk::ruma::{UserId, OwnedEventId}; +use matrix_sdk::ruma::events::room::message::{ + SyncRoomMessageEvent, MessageType, RoomMessageEventContent, + Relation, +}; +use matrix_sdk::ruma::events::relation::Replacement; +use matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent; +use matrix_sdk::ruma::events::typing::SyncTypingEvent; +use std::sync::Arc; +use dashmap::DashMap; +use tokio::sync::mpsc; +use crate::config::Config; +use crate::state::db::Db; +use crate::omegle::client::WsOmegleClient; +use crate::omegle::OmegleProvider; +use crate::utils::flags::country_code_to_flag; + +pub struct MatrixBot { + client: Client, + db: Arc, + config: Config, + handlers: Arc>>, +} + +#[derive(Debug)] +pub enum BotCommand { + Connect { msg_id: OwnedEventId }, + Match { prefer_same_country: bool, user_id: String, msg_id: OwnedEventId }, + Skip { user_id: String, msg_id: OwnedEventId }, + Pause, + Disconnect, + SendMessage(String), + SendTyping(bool), +} + +// Helper functions for nice messages +async fn send_text(room: &Room, text: &str) -> Result { + let content = RoomMessageEventContent::text_plain(text); + let resp = room.send(content).await?; + Ok(resp.event_id) +} + +async fn send_info(room: &Room, text: &str) -> Result { + let html = format!("{}", text); + let content = RoomMessageEventContent::text_html(text, html); + let resp = room.send(content).await?; + Ok(resp.event_id) +} + +async fn edit_info(room: &Room, event_id: OwnedEventId, text: &str) -> Result { + let html = format!("{}", text); + let mut content = RoomMessageEventContent::text_html(text, html); + content.relates_to = Some(Relation::Replacement(Replacement::new(event_id, content.clone().into()))); + let resp = room.send(content).await?; + Ok(resp.event_id) +} + +async fn send_status(room: &Room, text: &str, emoji: &str) -> Result { + let body = format!("{} {}", emoji, text); + let html = format!("{} {}", emoji, text); + let content = RoomMessageEventContent::text_html(body, html); + let resp = room.send(content).await?; + Ok(resp.event_id) +} + +async fn edit_status(room: &Room, event_id: OwnedEventId, text: &str, emoji: &str) -> Result { + let body = format!("{} {}", emoji, text); + let html = format!("{} {}", emoji, text); + let mut content = RoomMessageEventContent::text_html(body, html); + content.relates_to = Some(Relation::Replacement(Replacement::new(event_id, content.clone().into()))); + let resp = room.send(content).await?; + Ok(resp.event_id) +} + + +impl MatrixBot { + pub async fn new(config: Config, db: Arc) -> Result { + let user_id = UserId::parse(&config.matrix.username)?; + let client = Client::builder() + .homeserver_url(&config.matrix.homeserver) + .build() + .await?; + + client.matrix_auth().login_username(&user_id, &config.matrix.password).send().await?; + + Ok(Self { + client, + db, + config, + handlers: Arc::new(DashMap::new()), + }) + } + + pub async fn run(self) -> Result<()> { + let db_inv = self.db.clone(); + self.client.add_event_handler(move |ev: StrippedRoomMemberEvent, room: Room, client: Client| { + let _ = db_inv.clone(); + async move { + if room.state() == matrix_sdk::RoomState::Invited { + if let Some(user_id) = client.user_id() { + if ev.state_key == user_id.to_string() { + let _ = room.join().await; + } + } + } + } + }); + + let db_msg = self.db.clone(); + let config_msg = self.config.clone(); + let handlers_msg = self.handlers.clone(); + + self.client.add_event_handler(move |ev: SyncRoomMessageEvent, room: Room, client: Client| { + let db = db_msg.clone(); + let config = config_msg.clone(); + let handlers = handlers_msg.clone(); + async move { + if room.state() == matrix_sdk::RoomState::Joined { + let room_id = room.room_id().to_string(); + if let matrix_sdk::ruma::events::room::message::SyncRoomMessageEvent::Original(original) = ev { + // Skip messages from self + if let Some(user_id) = client.user_id() { + if original.sender == user_id { + return; + } + } + + if let MessageType::Text(text) = original.content.msgtype { + let body = text.body.trim(); + if body.starts_with('!') { + handle_command(body, &original.sender, &room, &db, &config, &handlers, &client).await; + } else if let Some(tx) = handlers.get(&room_id) { + let _ = tx.send(BotCommand::SendMessage(body.to_string())).await; + } + } + } + } + } + }); + + let handlers_typing = self.handlers.clone(); + self.client.add_event_handler(move |ev: SyncTypingEvent, room: Room, client: Client| { + let handlers = handlers_typing.clone(); + async move { + if room.state() == matrix_sdk::RoomState::Joined { + let room_id = room.room_id().to_string(); + if let Some(tx) = handlers.get(&room_id) { + let typing = if let Some(user_id) = client.user_id() { + ev.content.user_ids.iter().any(|id| id != user_id) + } else { + !ev.content.user_ids.is_empty() + }; + let _ = tx.send(BotCommand::SendTyping(typing)).await; + } + } + } + }); + + self.client.sync(SyncSettings::default()).await?; + Ok(()) + } +} + +async fn handle_command( + body: &str, + sender: &UserId, + room: &Room, + db: &Db, + config: &Config, + handlers: &Arc>>, + client: &Client, +) { + let parts: Vec<&str> = body.split_whitespace().collect(); + let cmd = parts[0]; + let room_id = room.room_id().to_string(); + let user_id = sender.to_string(); + + match cmd { + "!help" => { + let help_text = "Available commands:
\ + !help - Show this help message
\ + !connect - Connect to Omgle WebSocket
\ + !match [--same-country] - Request a new match (uses your interests)
\ + !skip - Skip current peer and automatch (uses your interests)
\ + !stop - Skip current peer without automatching
\ + !disconnect - Disconnect from Omgle WebSocket
\ + !autoskip add/remove/list <CC> - Manage your automatic skipping list (global)
\ + !interests add/remove/list/clear <interest> - Manage your interests (global)"; + let plain = "Available commands:\n!help, !connect, !match, !skip, !stop, !pause, !disconnect, !autoskip, !interests"; + let content = RoomMessageEventContent::text_html(plain, help_text); + let _ = room.send(content).await; + } + "!connect" => { + if handlers.contains_key(&room_id) { + let _ = send_info(room, "Already connected").await; + return; + } + + let msg_id = match send_info(room, "πŸ”„ Connecting to Omgle...").await { + Ok(id) => id, + Err(_) => return, + }; + + let (tx, rx) = mpsc::channel(100); + handlers.insert(room_id.clone(), tx.clone()); + let room_clone = room.clone(); + let config_clone = config.clone(); + let db_clone = db.conn.clone(); + let db_struct_clone = Arc::new(Db { conn: db_clone }); + let handlers_clone = handlers.clone(); + let client_clone = client.clone(); + + tokio::spawn(async move { + let mut omgle_client = WsOmegleClient::new(); + if let Err(e) = omgle_client.connect(&config_clone.omegle.websocket_url).await { + let _ = edit_info(&room_clone, msg_id, &format!("❌ Failed to connect: {}", e)).await; + handlers_clone.remove(&room_id); + return; + } + + let _ = tx.send(BotCommand::Connect { msg_id }).await; + let _ = omgle_client.request_people_online().await; + + handle_omgle_session(omgle_client, rx, room_clone, db_struct_clone, room_id.clone(), handlers_clone, client_clone).await; + }); + } + "!match" => { + if let Some(tx) = handlers.get(&room_id) { + let prefer_same_country = parts.contains(&"--same-country"); + let msg_id = match send_info(room, "πŸ” Matching...").await { + Ok(id) => id, + Err(_) => return, + }; + let _ = tx.send(BotCommand::Match { prefer_same_country, user_id, msg_id }).await; + } else { + let _ = send_info(room, "❌ Not connected to WebSocket. Use !connect first.").await; + } + } + "!interests" => { + if parts.len() < 2 { return; } + let mut config = db.get_user_config(&user_id).unwrap(); + match parts[1] { + "add" => { + for &i in &parts[2..] { + config.interests.push(i.to_string()); + } + db.update_user_config(&config).unwrap(); + let _ = send_info(room, &format!("βœ… Added interests: {:?}", &parts[2..])).await; + } + "remove" => { + config.interests.retain(|i| !parts[2..].contains(&i.as_str())); + db.update_user_config(&config).unwrap(); + let _ = send_info(room, &format!("πŸ—‘οΈ Removed interests: {:?}", &parts[2..])).await; + } + "list" => { + let _ = send_info(room, &format!("πŸ“ Your interests: {:?}", config.interests)).await; + } + "clear" => { + config.interests.clear(); + db.update_user_config(&config).unwrap(); + let _ = send_info(room, "✨ Interests cleared").await; + } + _ => { + let _ = send_info(room, "❌ Invalid !interests subcommand.").await; + } + } + } + "!skip" => { + if let Some(tx) = handlers.get(&room_id) { + let msg_id = match send_info(room, "⏩ Skipping...").await { + Ok(id) => id, + Err(_) => return, + }; + let _ = tx.send(BotCommand::Skip { user_id, msg_id }).await; + } else { + let _ = send_info(room, "❌ Not connected to WebSocket.").await; + } + } + "!stop" => { + if let Some(tx) = handlers.get(&room_id) { + let _ = tx.send(BotCommand::Pause).await; + } else { + let _ = send_info(room, "❌ Not connected to WebSocket.").await; + } + } + "!disconnect" => { + if let Some(tx) = handlers.get(&room_id) { + let _ = tx.send(BotCommand::Disconnect).await; + } else { + let _ = send_info(room, "❌ Not connected to WebSocket.").await; + } + } + "!autoskip" => { + if parts.len() < 2 { return; } + let mut config = db.get_user_config(&user_id).unwrap(); + match parts[1] { + "add" => { + for &c in &parts[2..] { + config.autoskip_countries.push(c.to_uppercase()); + } + db.update_user_config(&config).unwrap(); + let _ = send_info(room, &format!("βœ… Added to your skip list: {:?}", &parts[2..])).await; + } + "remove" => { + config.autoskip_countries.retain(|c| !parts[2..].contains(&c.as_str())); + db.update_user_config(&config).unwrap(); + let _ = send_info(room, &format!("πŸ—‘οΈ Removed from your skip list: {:?}", &parts[2..])).await; + } + "list" => { + let _ = send_info(room, &format!("πŸ“ Your auto-skip countries: {:?}", config.autoskip_countries)).await; + } + _ => { + let _ = send_info(room, "❌ Invalid !autoskip subcommand.").await; + } + } + } + _ => { + let _ = send_info(room, &format!("❌ Invalid command: {}. Type !help for a list of commands.", cmd)).await; + } + } +} + +async fn handle_omgle_session( + mut omgle: WsOmegleClient, + mut rx: mpsc::Receiver, + room: Room, + db: Arc, + room_id: String, + handlers: Arc>>, + _client: Client, +) { + let mut last_typing = std::time::Instant::now(); + let mut typing_active = false; + let mut last_people_online_request = std::time::Instant::now(); + let mut message_count = 0; + let mut local_typing_active = false; + let mut active_user_id: Option = None; + let mut peer_connected = false; + let mut last_prefer_same_country = false; + let mut pending_msg_id: Option = None; + + loop { + tokio::select! { + cmd = rx.recv() => { + match cmd { + Some(BotCommand::Connect { msg_id }) => { + let _ = edit_info(&room, msg_id, "βœ… Connected to Omgle").await; + }, + Some(BotCommand::Match { prefer_same_country, user_id, msg_id }) => { + if peer_connected { + let _ = omgle.disconnect_peer().await; + } + active_user_id = Some(user_id.clone()); + last_prefer_same_country = prefer_same_country; + pending_msg_id = Some(msg_id); + let user_config = db.get_user_config(&user_id).unwrap(); + let _ = omgle.request_match(prefer_same_country, user_config.interests).await; + + peer_connected = false; + local_typing_active = false; + typing_active = false; + let _ = room.typing_notice(false).await; + + let mut room_state = db.get_room_state(&room_id).unwrap(); + room_state.active_user_id = Some(user_id); + room_state.is_connected = true; + let _ = db.update_room_state(&room_state); + }, + Some(BotCommand::Skip { user_id, msg_id }) => { + if peer_connected { + active_user_id = Some(user_id.clone()); + pending_msg_id = Some(msg_id); + let user_config = db.get_user_config(&user_id).unwrap(); + let _ = omgle.disconnect_peer().await; + + peer_connected = false; + local_typing_active = false; + typing_active = false; + let _ = room.typing_notice(false).await; + + let _ = omgle.request_match(last_prefer_same_country, user_config.interests).await; + + let mut room_state = db.get_room_state(&room_id).unwrap(); + room_state.active_user_id = Some(user_id); + let _ = db.update_room_state(&room_state); + } else { + let _ = edit_info(&room, msg_id, "❌ No stranger to skip.").await; + } + }, + Some(BotCommand::Pause) => { + if peer_connected { + let _ = send_info(&room, "⏸️ Paused (Skipped peer)").await; + let _ = omgle.disconnect_peer().await; + peer_connected = false; + local_typing_active = false; + typing_active = false; + let _ = room.typing_notice(false).await; + } else { + let _ = send_info(&room, "❌ No stranger to pause.").await; + } + }, + Some(BotCommand::Disconnect) => { + let _ = omgle.disconnect().await; + let mut room_state = db.get_room_state(&room_id).unwrap(); + room_state.is_connected = false; + let _ = db.update_room_state(&room_state); + let _ = send_info(&room, "πŸ”Œ Disconnected from Omgle").await; + handlers.remove(&room_id); + return; + }, + Some(BotCommand::SendMessage(text)) => { + if peer_connected { + local_typing_active = false; + let _ = omgle.send_message(&text).await; + message_count += 1; + } + }, + Some(BotCommand::SendTyping(typing)) => { + if peer_connected { + local_typing_active = typing; + let _ = omgle.send_typing(typing).await; + } + }, + None => break, + } + } + ev = omgle.next_event() => { + match ev { + Ok(Some(msg)) => { + match msg.channel.as_str() { + "connected" => { + if let Some(msg_id) = pending_msg_id.take() { + let _ = edit_status(&room, msg_id, "Stranger connected", "🎲").await; + } else { + let _ = send_status(&room, "Stranger connected", "🎲").await; + } + message_count = 0; + peer_connected = true; + } + "peerCountry" => { + if let Ok(data) = serde_json::from_value::(msg.data) { + let flag = country_code_to_flag(&data.country); + let text = format!("Connected to {} {} ({})", flag, data.country_name, &data.country); + + let _ = send_status(&room, &text, "🌍").await; + + if let Some(user_id) = &active_user_id { + let config = db.get_user_config(user_id).unwrap(); + if config.autoskip_countries.contains(&data.country.to_uppercase()) { + pending_msg_id = send_info(&room, "⏩ Auto-skipping...").await.ok(); + let _ = omgle.disconnect_peer().await; + + peer_connected = false; + local_typing_active = false; + typing_active = false; + let _ = room.typing_notice(false).await; + + let _ = omgle.request_match(last_prefer_same_country, config.interests).await; + } + } + } + } + "peopleOnline" => { + if let Some(count) = msg.data.as_u64() { + let topic = format!("🟒 Online: {}", count); + let _ = room.set_room_topic(&topic).await; + } + } + "message" => { + if let Some(text) = msg.data.as_str() { + let _ = send_text(&room, text).await; + message_count += 1; + } + } + "typing" => { + let typing = msg.data.as_bool().unwrap_or(false); + typing_active = typing; + last_typing = std::time::Instant::now(); + let _ = room.typing_notice(typing).await; + } + "disconnect" => { + if peer_connected { + let _ = send_status(&room, "Stranger disconnected", "πŸ˜•").await; + peer_connected = false; + local_typing_active = false; + typing_active = false; + let _ = room.typing_notice(false).await; + + if message_count <= 4 { + if let Some(user_id) = &active_user_id { + pending_msg_id = send_info(&room, "⏩ Automatching...").await.ok(); + let config = db.get_user_config(user_id).unwrap(); + let _ = omgle.request_match(last_prefer_same_country, config.interests).await; + } + } + } + } + _ => {} + } + } + Ok(None) => break, + Err(_) => break, + } + } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(500)) => { + if typing_active && last_typing.elapsed().as_secs() >= 3 { + typing_active = false; + let _ = room.typing_notice(false).await; + } + + if local_typing_active && peer_connected { + let _ = omgle.send_typing(true).await; + } + + if last_people_online_request.elapsed().as_secs() >= 60 { + let _ = omgle.request_people_online().await; + last_people_online_request = std::time::Instant::now(); + } + } + } + } + let mut room_state = db.get_room_state(&room_id).unwrap(); + room_state.is_connected = false; + let _ = db.update_room_state(&room_state); + handlers.remove(&room_id); +} diff --git a/bot/src/matrix/mod.rs b/bot/src/matrix/mod.rs new file mode 100644 index 0000000..b9babe5 --- /dev/null +++ b/bot/src/matrix/mod.rs @@ -0,0 +1 @@ +pub mod client; diff --git a/bot/src/omegle/client.rs b/bot/src/omegle/client.rs new file mode 100644 index 0000000..85a424a --- /dev/null +++ b/bot/src/omegle/client.rs @@ -0,0 +1,106 @@ +use anyhow::{Result, anyhow}; +use async_trait::async_trait; +use futures_util::{SinkExt, StreamExt}; +use serde_json::json; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::tungstenite::protocol::Message; +use crate::omegle::protocol::OmegleMessage; +use crate::omegle::OmegleProvider; + +pub struct WsOmegleClient { + ws: Option>>, +} + +impl WsOmegleClient { + pub fn new() -> Self { + Self { ws: None } + } +} + +#[async_trait] +impl OmegleProvider for WsOmegleClient { + async fn connect(&mut self, url: &str) -> Result<()> { + let (ws_stream, _) = connect_async(url).await?; + self.ws = Some(ws_stream); + Ok(()) + } + + async fn disconnect(&mut self) -> Result<()> { + if let Some(mut ws) = self.ws.take() { + let _ = ws.close(None).await; + } + Ok(()) + } + + async fn disconnect_peer(&mut self) -> Result<()> { + let ws = self.ws.as_mut().ok_or_else(|| anyhow!("Not connected"))?; + let msg = json!({ + "channel": "disconnect" + }); + ws.send(Message::Text(msg.to_string())).await?; + Ok(()) + } + + async fn request_match(&mut self, prefer_same_country: bool, interests: Vec) -> Result<()> { + let ws = self.ws.as_mut().ok_or_else(|| anyhow!("Not connected"))?; + let match_msg = json!({ + "channel": "match", + "data": { + "data": "text", + "params": { + "interests": interests, + "preferSameCountry": prefer_same_country + } + } + }); + ws.send(Message::Text(match_msg.to_string())).await?; + Ok(()) + } + + async fn send_message(&mut self, text: &str) -> Result<()> { + let ws = self.ws.as_mut().ok_or_else(|| anyhow!("Not connected"))?; + let msg = json!({ + "channel": "message", + "data": text + }); + ws.send(Message::Text(msg.to_string())).await?; + Ok(()) + } + + async fn send_typing(&mut self, typing: bool) -> Result<()> { + let ws = self.ws.as_mut().ok_or_else(|| anyhow!("Not connected"))?; + let msg = json!({ + "channel": "typing", + "data": typing + }); + ws.send(Message::Text(msg.to_string())).await?; + Ok(()) + } + + async fn request_people_online(&mut self) -> Result<()> { + let ws = self.ws.as_mut().ok_or_else(|| anyhow!("Not connected"))?; + let msg = json!({ + "channel": "peopleOnline" + }); + ws.send(Message::Text(msg.to_string())).await?; + Ok(()) + } + + async fn next_event(&mut self) -> Result> { + let ws = self.ws.as_mut().ok_or_else(|| anyhow!("Not connected"))?; + while let Some(msg) = ws.next().await { + match msg? { + Message::Text(text) => { + let omegle_msg: OmegleMessage = serde_json::from_str(&text)?; + return Ok(Some(omegle_msg)); + } + Message::Binary(_) => {} + Message::Ping(_) | Message::Pong(_) => {} + Message::Close(_) => return Ok(None), + Message::Frame(_) => {} + } + } + Ok(None) + } +} diff --git a/bot/src/omegle/mod.rs b/bot/src/omegle/mod.rs new file mode 100644 index 0000000..9de3386 --- /dev/null +++ b/bot/src/omegle/mod.rs @@ -0,0 +1,17 @@ +pub mod protocol; +pub mod client; + +use anyhow::Result; +use async_trait::async_trait; + +#[async_trait] +pub trait OmegleProvider: Send + Sync { + async fn connect(&mut self, url: &str) -> Result<()>; + async fn disconnect(&mut self) -> Result<()>; + async fn disconnect_peer(&mut self) -> Result<()>; + async fn request_match(&mut self, prefer_same_country: bool, interests: Vec) -> Result<()>; + async fn send_message(&mut self, text: &str) -> Result<()>; + async fn send_typing(&mut self, typing: bool) -> Result<()>; + async fn request_people_online(&mut self) -> Result<()>; + async fn next_event(&mut self) -> Result>; +} diff --git a/bot/src/omegle/protocol.rs b/bot/src/omegle/protocol.rs new file mode 100644 index 0000000..d8d365c --- /dev/null +++ b/bot/src/omegle/protocol.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct OmegleMessage { + pub channel: String, + #[serde(default)] + pub data: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CountryData { + pub country: String, + #[serde(rename = "countryName")] + pub country_name: String, +} diff --git a/bot/src/state/db.rs b/bot/src/state/db.rs new file mode 100644 index 0000000..d225475 --- /dev/null +++ b/bot/src/state/db.rs @@ -0,0 +1,113 @@ +use crate::state::models::{RoomState, UserConfig}; +use anyhow::Result; +use rusqlite::{params, Connection}; +use std::sync::{Arc, Mutex}; + +pub struct Db { + pub conn: Arc>, +} + +impl Db { + pub fn new(path: &str) -> Result { + let conn = Connection::open(path)?; + conn.execute( + "CREATE TABLE IF NOT EXISTS room_state ( + room_id TEXT PRIMARY KEY, + is_connected INTEGER DEFAULT 0, + active_user_id TEXT + )", + [], + )?; + conn.execute( + "CREATE TABLE IF NOT EXISTS user_config ( + user_id TEXT PRIMARY KEY, + autoskip_countries TEXT, + interests TEXT + )", + [], + )?; + Ok(Self { + conn: Arc::new(Mutex::new(conn)), + }) + } + + pub fn get_room_state(&self, room_id: &str) -> Result { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT room_id, is_connected, active_user_id FROM room_state WHERE room_id = ?", + )?; + let state = stmt.query_row(params![room_id], |row| { + Ok(RoomState { + room_id: row.get(0)?, + is_connected: row.get::<_, i32>(1)? != 0, + active_user_id: row.get(2)?, + }) + }); + + match state { + Ok(s) => Ok(s), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(RoomState { + room_id: room_id.to_string(), + is_connected: false, + active_user_id: None, + }), + Err(e) => Err(e.into()), + } + } + + pub fn update_room_state(&self, state: &RoomState) -> Result<()> { + let conn = self.conn.lock().unwrap(); + conn.execute( + "INSERT OR REPLACE INTO room_state (room_id, is_connected, active_user_id) VALUES (?, ?, ?)", + params![state.room_id, state.is_connected as i32, state.active_user_id], + )?; + Ok(()) + } + + pub fn get_user_config(&self, user_id: &str) -> Result { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare( + "SELECT user_id, autoskip_countries, interests FROM user_config WHERE user_id = ?", + )?; + let config = stmt.query_row(params![user_id], |row| { + let countries_str: String = row.get(1).unwrap_or_default(); + let autoskip_countries = if countries_str.is_empty() { + Vec::new() + } else { + countries_str.split(',').map(|s| s.to_string()).collect() + }; + let interests_str: String = row.get(2).unwrap_or_default(); + let interests = if interests_str.is_empty() { + Vec::new() + } else { + interests_str.split(',').map(|s| s.to_string()).collect() + }; + Ok(UserConfig { + user_id: row.get(0)?, + autoskip_countries, + interests, + }) + }); + + match config { + Ok(c) => Ok(c), + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(UserConfig { + user_id: user_id.to_string(), + autoskip_countries: Vec::new(), + interests: Vec::new(), + }), + Err(e) => Err(e.into()), + } + } + + pub fn update_user_config(&self, config: &UserConfig) -> Result<()> { + let conn = self.conn.lock().unwrap(); + let countries_str = config.autoskip_countries.join(","); + let interests_str = config.interests.join(","); + conn.execute( + "INSERT OR REPLACE INTO user_config (user_id, autoskip_countries, interests) VALUES (?, ?, ?)", + params![config.user_id, countries_str, interests_str], + )?; + Ok(()) + } +} diff --git a/bot/src/state/mod.rs b/bot/src/state/mod.rs new file mode 100644 index 0000000..83cdabe --- /dev/null +++ b/bot/src/state/mod.rs @@ -0,0 +1,2 @@ +pub mod db; +pub mod models; diff --git a/bot/src/state/models.rs b/bot/src/state/models.rs new file mode 100644 index 0000000..31db980 --- /dev/null +++ b/bot/src/state/models.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RoomState { + pub room_id: String, + pub is_connected: bool, + pub active_user_id: Option, // User ID whose config is active in this room +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserConfig { + pub user_id: String, + pub autoskip_countries: Vec, + pub interests: Vec, +} diff --git a/bot/src/utils/flags.rs b/bot/src/utils/flags.rs new file mode 100644 index 0000000..9680287 --- /dev/null +++ b/bot/src/utils/flags.rs @@ -0,0 +1,13 @@ +pub fn country_code_to_flag(code: &str) -> String { + if code.len() != 2 { + return "🏳".to_string(); + } + + code.to_uppercase() + .chars() + .map(|c| { + // regional indicator symbols start at 0x1F1E6 ('A') + char::from_u32(0x1F1E6 + (c as u32 - 'A' as u32)).unwrap_or('?') + }) + .collect() +} diff --git a/bot/src/utils/mod.rs b/bot/src/utils/mod.rs new file mode 100644 index 0000000..11e6cdb --- /dev/null +++ b/bot/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod flags; diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..203b6a0 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,52 @@ +services: + # Selenium Grid with Chromium (or Chrome) + selenium: + image: selenium/standalone-chromium:4.43.0-20260404 + container_name: selenium + shm_size: 2gb + ports: + - "${GRID_DASHBOARD_PORT}:4444" + environment: + - SE_VNC_NO_PASSWORD=1 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4444/status"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s + + # Proxy needed for solving Cloudflare's anti-bot protection. + # It uses a Chrome to get the necessary cookies and then forwards the WebSocket connection to Omegle + omegle-proxy: + build: ./proxy + container_name: omegle-proxy + depends_on: + selenium: + condition: service_healthy + environment: + - CF_WAIT_TIME=${CF_WAIT_TIME} + - TARGET_URL=https://omegleweb.io + - TARGET_WS_URL=wss://omegleweb.io:8443/socket.io/?EIO=4&transport=websocket + - LOCAL_HOST=0.0.0.0 + - LOCAL_PORT=8765 + - HEADLESS=false + - SELENIUM_URL=http://selenium:4444 + + omegle-bot: + build: ./bot + container_name: omegle-bot + environment: + - MATRIX_HOMESERVER=${MATRIX_HOMESERVER} + - MATRIX_USERNAME=${MATRIX_USERNAME} + - MATRIX_PASSWORD=${MATRIX_PASSWORD} + - OMEGLE_WEBSOCKET_URL=ws://omegle-proxy:8765 + - DB_PATH=/data/omegle.db + volumes: + - omegle-bot-data:/data + depends_on: + omegle-proxy: + condition: service_started + restart: unless-stopped + +volumes: + omegle-bot-data: diff --git a/proxy/Dockerfile b/proxy/Dockerfile new file mode 100644 index 0000000..13f8929 --- /dev/null +++ b/proxy/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.12-slim + +RUN pip install --no-cache-dir setuptools selenium websockets + +WORKDIR /app + +COPY main.py . + +CMD ["python", "main.py"] diff --git a/proxy/main.py b/proxy/main.py new file mode 100644 index 0000000..8aec2fd --- /dev/null +++ b/proxy/main.py @@ -0,0 +1,182 @@ +import os +import time +import asyncio +import websockets +from datetime import datetime +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +TARGET_URL = os.getenv("TARGET_URL", "https://omegleweb.io/") +TARGET_WS_URL = os.getenv("TARGET_WS_URL", "wss://omegleweb.io:8443/socket.io/?EIO=4&transport=websocket") +LOCAL_HOST = os.getenv("LOCAL_HOST", "0.0.0.0") +LOCAL_PORT = int(os.getenv("LOCAL_PORT", "8765")) +HEADLESS = os.getenv("HEADLESS", "false").lower() in ("true", "1", "yes") +CF_WAIT_TIME = int(os.getenv("CF_WAIT_TIME", "60")) +SELENIUM_URL = os.getenv("SELENIUM_URL", "http://localhost:4444") + +credentials = { + "user_agent": None, + "cookies": None +} + +def stealth_js(driver): + driver.execute_script(""" + Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); + Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); + Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); + window.chrome = {runtime: {}}; + const originalQuery = window.navigator.permissions.query; + window.navigator.permissions.query = (parameters) => ( + parameters.name === 'notifications' ? + Promise.resolve({state: Notification.permission}) : + originalQuery(parameters) + ); + Object.defineProperty(navigator, 'permissions', {get: () => window.navigator.permissions}); + """) + +def extract_credentials(): + print("\n" + "="*50) + print("[*] PHASE 1: SELENIUM EXTRACTION") + print("="*50) + print("[*] Launching Chrome browser...") + + options = Options() + if HEADLESS: + options.add_argument("--headless=new") + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-gpu") + options.add_argument("--disable-software-rasterizer") + options.add_argument("--disable-extensions") + options.add_argument("--disable-blink-features=AutomationControlled") + options.add_argument("--disable-setuid-sandbox") + options.add_argument("--disable-background-networking") + options.add_argument("--disable-default-apps") + options.add_argument("--disable-sync") + options.add_argument("--disable-translate") + options.add_argument("--metrics-recording-only") + options.add_argument("--mute-audio") + options.add_argument("--no-first-run") + options.add_argument("--safebrowsing-disable-auto-update") + options.add_argument("--ignore-certificate-errors") + options.add_argument("--ignore-ssl-errors") + options.add_argument("--user-data-dir=/tmp/chrome-data") + options.add_argument("--disable-features=IsolateOrigins,site-per-process") + + driver = webdriver.Remote( + command_executor=SELENIUM_URL + "/wd/hub", + options=options + ) + + stealth_js(driver) + + try: + print("[*] Navigating to OmegleWeb homepage...") + driver.get(TARGET_URL) + + wait = WebDriverWait(driver, CF_WAIT_TIME) + wait.until(EC.presence_of_element_located((By.ID, "logo"))) + + print("[*] Time's up! Extracting the goods...") + user_agent = driver.execute_script("return navigator.userAgent;") + selenium_cookies = driver.get_cookies() + cookie_string = "; ".join([f"{c['name']}={c['value']}" for c in selenium_cookies]) + + credentials["user_agent"] = user_agent + credentials["cookies"] = cookie_string + + print("[+] Extraction successful!") + return True + except Exception as e: + print(f"[!] Extraction failed: {e}") + return False + finally: + print("[*] Closing browser...") + driver.quit() + +async def start_bridge(): + async def bridge_handler(local_client): + print(f"\n[{datetime.now().strftime('%H:%M:%S')}] [+] Local client connected to the bridge!") + + while True: + headers = { + "User-Agent": credentials["user_agent"], + "Cookie": credentials["cookies"] + } + + try: + print(f"[*] Attempting tunnel to OmegleWeb...") + async with websockets.connect(TARGET_WS_URL, additional_headers=headers) as target_server: + print("[+] Tunnel established! Relaying messages...") + + async def forward_local_to_target(): + async for message in local_client: + print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [Local -> Omegle]: {message}") + await target_server.send(message) + + async def forward_target_to_local(): + async for message in target_server: + print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [Omegle -> Local]: {message}") + await local_client.send(message) + + t1 = asyncio.create_task(forward_local_to_target()) + t2 = asyncio.create_task(forward_target_to_local()) + + done, pending = await asyncio.wait( + [t1, t2], + return_when=asyncio.FIRST_COMPLETED + ) + + for task in pending: + task.cancel() + + if t1 in done and t1.exception() is None: + print("[-] Local client disconnected.") + break + + print("[-] OmegleWeb connection closed.") + break + + except websockets.exceptions.InvalidStatusCode as e: + if e.status_code == 403: + print(f"[!] HTTP 403 Forbidden: Cookies likely expired. Refreshing...") + loop = asyncio.get_running_loop() + success = await loop.run_in_executor(None, extract_credentials) + if not success: + print("[!] Failed to refresh cookies. Retrying in 5s...") + await asyncio.sleep(5) + continue + else: + print(f"[!] Bridge error (Status {e.status_code}): {e}") + break + except websockets.exceptions.ConnectionClosed: + print("[-] OmegleWeb disconnected.") + break + except Exception as e: + print(f"[!] Bridge error: {type(e).__name__}: {e}") + break + + print(f"[*] Starting local WebSocket proxy on ws://{LOCAL_HOST}:{LOCAL_PORT}") + async with websockets.serve(bridge_handler, LOCAL_HOST, LOCAL_PORT): + await asyncio.Future() + +def main(): + if not extract_credentials(): + print("[!] Initial extraction failed. Exiting.") + return + + print("\n" + "="*50) + print("[*] PHASE 2: LAUNCH THE WEBSOCKET PROXY") + print("="*50) + + try: + asyncio.run(start_bridge()) + except KeyboardInterrupt: + print("\n[*] Shutting down...") + +if __name__ == '__main__': + main() diff --git a/proxy/start.sh b/proxy/start.sh new file mode 100755 index 0000000..3303f7e --- /dev/null +++ b/proxy/start.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +env/bin/python3 main.py