#!/usr/bin/env python3 """ One note, but moar """ import asyncio from os import getenv from pathlib import Path HOST = '0.0.0.0' PORT = 1337 PWL = getenv('PWL', 10) DDIR = Path('./data') FLAG = getenv('FLAG', 'ctf{placeholder}') ADMIN_USER_ID = 1 class Client: def __init__(self, reader, writer): self.reader = reader self.writer = writer self._db_path = Path('./data/accounts.db') self._db_path.touch() # setup database self.set_password(0, "."*PWL, admin_ok=True) self.set_password(ADMIN_USER_ID, "x"*PWL, admin_ok=True) self._write_note(ADMIN_USER_ID, 'flag', FLAG, exists_ok=True) @property def database(self): return self._db_path.read_text() @database.setter def database(self, value): self._db_path.write_text(value) async def get_next_user_id(self): i = 2 while True: if not await self.get_password(i): return i i += 1 """ utils """ async def get_password(self, user_id: int, validate: str = None): idx = (user_id * PWL) pw = self.database[idx:idx+PWL].strip(' ') print(f'[get_password] {idx=} {PWL=} {pw=} {validate=}') if pw == "x"*PWL: await self.send_message('cannot access disabled user') raise ValueError('cannot access disabled user') if validate and pw != validate: await self.send_message(f"invalid password: {validate}") return pw def set_password(self, user_id: int, password: str, admin_ok=False): if user_id == ADMIN_USER_ID and not admin_ok: self.send_message(f"admin user cannot be modified!") return idx = (user_id * PWL) new = f"{password:<{PWL}}" self.database = self.database[:idx] + new + self.database[idx + len(new):] print(f'[set_password] {idx=} {new=} {self.database=}') async def send_message(self, message): """Send a message to the client""" self.writer.write(message.encode() + b'\n') await self.writer.drain() async def receive_input(self, prompt, bytes: int=None): """Send a prompt and receive input from the client""" self.writer.write(prompt.encode()) await self.writer.drain() if bytes: data = await self.reader.read(bytes) else: data = await self.reader.readline() return data.decode().strip() async def read_password(self, prompt="Enter password: "): await self.send_message(f"Password must be between 1 and {PWL} characters, and must not contain spaces.") return await self.receive_input(prompt) async def read_user_id(self): user_id_str = await self.receive_input("Enter User ID: ") return int(user_id_str) """ actions """ async def register_user(self): """Register a new user with a password""" password = await self.read_password() user_id = await self.get_next_user_id() self.set_password(user_id, password) (DDIR / str(user_id)).mkdir(parents=True, exist_ok=True) await self.send_message(f"User registered successfully! Your User ID is {user_id}, use this to login") async def change_password(self): """Change a user's password""" user_id = await self.read_user_id() password = await self.read_password("Enter current password: ") await self.get_password(user_id, validate=password) new_password = await self.read_password("Enter new password: ") print(f"[change_password] {user_id=} {password=} {new_password=}") if new_password == "x"*PWL: await self.send_message(f"Invalid password!") self.set_password(user_id, new_password) async def get_note(self): """Retrieve a note for a user""" user_id = await self.read_user_id() password = await self.receive_input("Enter password: ") if password != await self.get_password(user_id): await self.send_message("incorrect password") return note_name = await self.receive_input("Enter note name: ") # Read note note = DDIR / str(user_id) / f"{note_name}.txt" if not note.is_file(): await self.send_message("note not found") with open(note, 'r') as f: content = f.read() await self.send_message(f"{'='*10}\n{content}\n{'='*10}") async def write_note(self): """Write a note for a user""" user_id = await self.read_user_id() password = await self.read_password() note_name = await self.receive_input("Enter note name: ") note_content = await self.receive_input("Enter note content: ", bytes=256) await self.get_password(user_id, validate=password) await self._write_note(user_id, note_name, note_content) async def _write_note(self, user_id: int, name, content, exists_ok=False): note_path = DDIR / str(user_id) / f"{name}.txt" if note_path.exists() and not exists_ok: await self.send_message("note already exists") return note_path.parent.mkdir(parents=True, exist_ok=True) try: with open(note_path, 'w') as f: f.write(content) except Exception as e: print(e) async def list_notes(self): """List all notes for a user""" user_id = await self.read_user_id() password = await self.read_password() await self.get_password(user_id, validate=password) # List files in user directory user_dir = DDIR / str(user_id) if not user_dir.exists(): return print('iter') for file in user_dir.iterdir(): print('file', file.name) if file.name.endswith('.txt'): await self.send_message(file.name.rstrip('.txt')) print('loop done') async def handle(self): """Handle a client connection""" await self.send_message("=" * 50) await self.send_message(f"Welcome to Ten Note, managing notes for {len(self.database) / PWL} users!") await self.send_message("=" * 50) while True: await self.send_message("") await self.send_message("[R]egister User") await self.send_message("[C]hange Password") await self.send_message("[G]et Note") await self.send_message("[W]rite Note") await self.send_message("[L]ist Notes") await self.send_message("[Q]uit") choice = (await self.receive_input("\nEnter your choice: ")).upper() if choice == 'R': await self.register_user() elif choice == 'C': await self.change_password() elif choice == 'G': await self.get_note() elif choice == 'W': await self.write_note() elif choice == 'L': await self.list_notes() elif choice == 'Q': await self.send_message("Goodbye!") return else: await self.send_message("Invalid choice. Please try again.") async def main(): """Main server loop""" server = await asyncio.start_server( handle_client, HOST, PORT ) addr = server.sockets[0].getsockname() print(f"[*] Server listening on {addr[0]}:{addr[1]}") print(f"[*] Connect with: nc localhost {addr[1]}") async with server: await server.serve_forever() async def handle_client(reader, writer): """Handle a single client connection""" address = writer.get_extra_info('peername') print(f"[+] Connection from {address[0]}:{address[1]}") try: # You'll need to adapt your Client class to work with reader/writer # instead of a socket await Client(reader, writer).handle() except Exception as e: print(f"[-] Error handling client: {e}") finally: writer.close() await writer.wait_closed() print(f"[-] Connection closed from {address[0]}:{address[1]}") if __name__ == "__main__": asyncio.run(main())