Wsgiserver 0.2 Updated Access
def __init__(self, app): self.app = app def __call__(self, environ, start_response): import time start = time.time() response = self.app(environ, start_response) duration = time.time() - start print(f"Request took: duration:.4fs") return response app = hello_world_app app = LoggingMiddleware(app) app = TimingMiddleware(app)
Overview WSGIServer 0.2 is a lightweight, pure-Python WSGI HTTP server compliant with PEP 3333. This guide covers development setup, architecture, implementation details, testing, and deployment. 1. Project Structure wsgiserver/ ├── wsgiserver/ │ ├── __init__.py │ ├── server.py # Main server class │ ├── handler.py # Request handler │ ├── connection.py # Connection management │ ├── request.py # Request parsing │ ├── response.py # Response building │ └── utils.py # Helper functions ├── tests/ │ ├── test_server.py │ ├── test_handler.py │ └── fixtures/ ├── examples/ │ ├── hello_world.py │ └── middleware_demo.py ├── docs/ ├── setup.py ├── README.md └── requirements.txt 2. Core Implementation 2.1 Main Server Class ( server.py ) import socket import select import threading from wsgiref.handlers import BaseHandler class WSGIServer: """WSGI-compliant HTTP server""" wsgiserver 0.2
# Give server time to start import time time.sleep(0.5) def __init__(self, app): self
def __init__(self, host='127.0.0.1', port=8000, application=None): self.host = host self.port = port self.application = application self.server_socket = None self.running = False self.connections = [] def bind_and_listen(self): """Create and bind server socket""" self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.server_socket.setblocking(False) def serve_forever(self): """Main server loop""" self.bind_and_listen() self.running = True print(f"Serving on http://self.host:self.port") while self.running: try: readable, _, _ = select.select([self.server_socket], [], [], 1) if readable: client_socket, addr = self.server_socket.accept() self.handle_client(client_socket, addr) except Exception as e: print(f"Error: e") def handle_client(self, client_socket, addr): """Handle incoming client connection""" handler = WSGIHandler(client_socket, addr, self.application) handler.handle_request() def shutdown(self): """Gracefully shutdown server""" self.running = False if self.server_socket: self.server_socket.close() import os import sys from io import BytesIO class WSGIHandler: """Handle individual HTTP requests""" _ = select.select([self.server_socket]
server = WSGIServer('127.0.0.1', 8889, app) thread = threading.Thread(target=server.serve_forever) thread.daemon = True thread.start()