Files
Text-to-speach-bot/run.py
BaileyCodes 25ee6fc70f Balls haha
Added main code
2026-03-06 17:36:15 +00:00

187 lines
5.7 KiB
Python

import discord
import asyncio
import os
import tempfile
import edge_tts
"""USER ID : WHAT THEY WANT AS THEIR VOICE"""
USER_VOICES = {
1455496506856702096: "en-US-AvaNeural",
1391109314529198152: "en-US-GuyNeural",
1188310664679260240: "en-GB-ThomasNeural",
1133187384863895702: "en-US-AndrewNeural",
1242286951273594971: "en-US-JennyNeural",
984087525352153189: "en-IN-PrabhatNeural",
835554650912325632: "en-US-AnaNeural",
1335052132923080734: "en-US-AriaNeural"
}
DEFAULT_VOICE = "en-US-JennyNeural"
PREFIX = '~'
intents = discord.Intents.default()
intents.message_content = True
intents.voice_states = True
bot = discord.Client(intents=intents)
guild_data = {}
async def ensure_voice(guild_id, target_channel):
"""Ensure the bot is connected to the given voice channel."""
data = guild_data.get(guild_id)
if not data:
return None
vc = data['vc']
if vc and vc.is_connected():
if vc.channel.id == target_channel.id:
return vc
else:
print(f"[Debug] Moving from {vc.channel.name} to {target_channel.name}")
await vc.disconnect()
vc = None
if vc is None or not vc.is_connected():
try:
vc = await target_channel.connect()
data['vc'] = vc
print(f"[Debug] Connected to {target_channel.name}")
except Exception as e:
print(f"[Debug] Failed to connect to {target_channel.name}: {e}")
return None
return vc
async def queue_processor(guild_id):
"""Process messages from the queue for a specific guild."""
data = guild_data[guild_id]
queue = data['queue']
while True:
content, target_vc, user_id = await queue.get()
print(f"[Debug] Processing message for guild {guild_id} from user {user_id}: '{content}'")
vc = await ensure_voice(guild_id, target_vc)
if not vc:
print(f"[Debug] No voice connection, skipping message")
continue
voice_name = USER_VOICES.get(user_id, DEFAULT_VOICE)
print(f"[Debug] Using voice: {voice_name}")
try:
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as tmp:
tmp_path = tmp.name
communicate = edge_tts.Communicate(text=content, voice=voice_name)
await communicate.save(tmp_path)
source = discord.FFmpegPCMAudio(tmp_path)
def after_playing(error):
os.unlink(tmp_path)
if error:
print(f"[Debug] Playback error: {error}")
vc.play(source, after=after_playing)
print(f"[Debug] Started playing: '{content}'")
while vc.is_playing():
await asyncio.sleep(0.1)
print(f"[Debug] Finished playing: '{content}'")
except Exception as e:
print(f"[Debug] Error during TTS playback: {e}")
if os.path.exists(tmp_path):
os.unlink(tmp_path)
async def reset_timer(guild, voice_channel):
"""Reset the 5-minute disconnect timer for a guild."""
data = guild_data.get(guild.id)
if not data:
return
if data['timer']:
data['timer'].cancel()
async def timer_func():
try:
await asyncio.sleep(300)
print(f"[Debug] 5 minutes passed, disconnecting from {guild.name}")
if guild.id in guild_data:
vc = guild_data[guild.id].get('vc')
if vc and vc.is_connected():
await vc.disconnect()
guild_data[guild.id]['vc'] = None
except asyncio.CancelledError:
print(f"[Debug] Timer cancelled for {guild.name}")
pass
data['timer'] = asyncio.create_task(timer_func())
print(f"[Debug] Timer reset for {guild.name} (will disconnect in 5 min)")
@bot.event
async def on_ready():
print(f'[Debug] Logged in as {bot.user} (ID: {bot.user.id})')
print('------')
@bot.event
async def on_message(message):
if message.author.bot:
return
if message.author.id not in USER_VOICES:
return
if not message.content.startswith(PREFIX):
return
text = message.content[len(PREFIX):].strip()
if not text:
print(f"[Debug] Ignored empty command from {message.author}")
return
print(f"[Debug] Received command from {message.author}: '{text}'")
if not message.author.voice or not message.author.voice.channel:
print(f"[Debug] User {message.author} is not in a voice channel. Ignoring.")
return
voice_channel = message.author.voice.channel
guild = message.guild
print(f"[Debug] User is in voice channel: {voice_channel.name}")
if guild.id not in guild_data:
guild_data[guild.id] = {
'queue': asyncio.Queue(),
'task': None,
'timer': None,
'vc': None
}
guild_data[guild.id]['task'] = asyncio.create_task(queue_processor(guild.id))
print(f"[Debug] Initialized data for guild {guild.name}")
await guild_data[guild.id]['queue'].put((text, voice_channel, message.author.id))
print(f"[Debug] Added to queue for guild {guild.name}")
await reset_timer(guild, voice_channel)
@bot.event
async def on_voice_state_update(member, before, after):
if member == bot.user and after.channel is None:
print(f"[Debug] Bot disconnected from {member.guild.name}")
for gid, data in guild_data.items():
if data.get('vc') and data['vc'].guild.id == member.guild.id:
data['vc'] = None
if data['timer']:
data['timer'].cancel()
break
bot.run('PUT UR TOKEN HERE')