← 返回首页
状态:已完成

【Python 代码】构建异步事件驱动的主循环,协调写入线程与查询线程的资源竞争与数据一致性

Thinking Process: 1. **Analyze the Request:** * **Role:** Professional programming assistant. * **Task:** Judge and answer based on specific requirements. * **Goal:** Build an asynchronous event-driven main loop to coordinate resource contention and data consistency between a write thread and a query thread. * **Rules:** 1. If the goal can be directly implemented in Python (running the code achieves the goal), return *only* the complete Python code, no explanation. 2. If not directly implementable, split the goal into 2-5 independent sub-goals. Each sub-goal must be a complete, standalone objective. Format: `子目标:XXXXXX` (one per line). 3. No extra text. 2. **Evaluate the Goal:** * Goal: "构建异步事件驱动的主循环,协调写入线程与查询线程的资源竞争与数据一致性" (Build an asynchronous event-driven main loop to coordinate resource contention and data consistency between a write thread and a query thread). * Context: This sounds like a high-level architectural design task. * Can it be directly implemented in Python? * Python's `asyncio` is great for event loops. * Threading (`threading`) or `asyncio` tasks can handle concurrency. * Resource contention (locks, queues) and data consistency (atomic operations, snapshots) can be coded. * However, "running the code" to *achieve* the goal usually implies a working system that demonstrates this behavior. A simple script with a main loop, a write task, and a query task, using locks/queues to manage consistency, *does* exist. * Is it "directly implementable"? Yes. A Python script using `asyncio`, `queue.Queue` (for thread safety), `asyncio.Lock`, and event loops can create this architecture. * But wait, the prompt asks to judge *strictly*. Is a single file sufficient to "achieve" the goal? Usually, when a goal is "Build X", providing the code for X is the implementation. * Let's look at Rule 1: "If this goal can be directly implemented in Python code, that is, running the code can achieve the goal, directly return complete Python code". * Can running a Python script achieve "an asynchronous event-driven main loop coordinating write/query threads"? Yes, if the script defines the loop, the threads/tasks, and the synchronization primitives. * However, there's a nuance: "threads" vs "asyncio". Pure `asyncio` uses coroutines, not OS threads by default. To have actual "threads" (IO-bound blocking or CPU-bound) working with an event loop, one often uses `asyncio` with `loop.run_in_executor` or standard `threading` with an event loop (like `asyncio` in the main thread calling `threading` tasks). * A robust implementation would involve an `asyncio` event loop, a producer (write) task/thread, and a consumer (query) task/thread, using synchronization primitives. * Since Python's `asyncio` can handle concurrency and `threading` can be integrated, a self-contained script can demonstrate this architecture. * Is it "complete"? It might not be a production-ready full system, but it fulfills the *structural* goal of building the mechanism. * Comparing Rule 1 and Rule 2: Rule 1 is preferred if applicable. Rule 2 is for when it's too complex to fit in one go (e.g., "Build a full operating system"). * "Build an asynchronous event-driven main loop..." is a specific programming task. A script like the one below fits: ```python import asyncio import threading from queue import Queue # ... code ... ``` * So, Rule 1 seems applicable. The code *is* the implementation of the goal. * *Self-Correction/Refinement:* Does "running the code" mean it works indefinitely or performs the coordination? Yes. A script that sets up the loop and starts the threads/tasks fulfills the "build" requirement in a coding context. * However, sometimes these prompts prefer sub-goals if the goal is abstract enough. But "Python code" is specific. * Let's consider if there are dependencies missing. Usually, a "goal" like this implies a system. A single file can contain the classes, the loop, and the logic. * Decision: Rule 1 applies. It is directly implementable in Python. * *Wait, let's look closer at the "threads" part.* Pure `asyncio` uses coroutines. If the requirement explicitly says "threads" (写入线程与查询线程), it implies `threading.Thread`. Mixing threads and async loops requires care (e.g., `asyncio.get_event_loop().run_in_executor`). * A script combining `asyncio` loop and `threading` to achieve coordination is feasible in one file. * So, Rule 1 is the best fit. 3. **Drafting the Code:** * Need an `asyncio` loop. * Need a shared data structure (e.g., `asyncio.Queue` or `threading.Lock` + list). * Need a write coroutine/thread. * Need a query coroutine/thread. * Need synchronization for consistency. * Let's create a concrete example that runs. ```python import asyncio import threading import time from queue import Queue class DataStore: def __init__(self): self.data = [] self.lock = threading.Lock() self.queue = asyncio.Queue() async def write(self, item): await self.queue.put(item) async def query(self): return list(self.data) def run_write_thread(self): def writer(): count = 0 while True: time.sleep(0.5) with self.lock: self.data.append(f"Data-{count}") count += 1 t = threading.Thread(target=writer, daemon=True) t.start() def run_query_thread(self): def reader(): while True: time.sleep(1) with self.lock: print(f"Query Result: {self.data}") t = threading.Thread(target=reader, daemon=True) t.start() async def main(): store = DataStore() store.run_write_thread() store.run_query_thread() await asyncio.sleep(10) if __name__ == "__main__": asyncio.run(main()) ``` * Wait, mixing `asyncio` loop with blocking threads might need the loop to keep running. `asyncio.run(main())` keeps the loop alive. * Is this "event-driven"? The loop drives the async parts, but the threads are `while True`. * To make it truly "event-driven main loop coordinating threads", the loop should ideally manage the threads or the threads should signal the loop. * Better approach: Use `asyncio` for the main loop, use `run_in_executor` for thread-like tasks, or use a `Queue` to sync between async and thread. * Let's simplify to ensure it runs and demonstrates the concept clearly. * Actually, a simpler pure `asyncio` implementation often counts as "threads" (logical threads/tasks) in Python context, but if strict OS threads are needed, `threading` is needed. * Let's stick to the core requirement: Event-driven loop + Coordination + Consistency. * Code structure: 1. Event Loop setup. 2. Shared State with Lock. 3. Write Task. 4. Query Task. 5. Loop execution. * Let's refine the code to be robust. * Use `asyncio.Queue` for async coordination. Use `threading.Lock` for consistency if mixing threads. * Actually, the prompt says "Write Thread and Query Thread". So `threading.Thread` is explicit. * So the main loop should be `asyncio` but it needs to run alongside threads. * Code: ```python import asyncio import threading import time class SharedData: def __init__(self): self.data = [] self.lock = threading.Lock() def write_data(self, value): with self.lock: self.data.append(value) def query_data(self): with self.lock: return self.data.copy() async def write_task(store, event): i = 0 while not event.is_set(): await asyncio.sleep(0.5) store.write_data(f"Write-{i}") i += 1 async def query_task(store, event): i = 0 while not event.is_set(): await asyncio.sleep(1) data = store.query_data() print(f"Query-{i}: {data}") i += 1 def run_threads(store, event): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(write_task(store, event)) loop.run_until_complete(query_task(store, event)) async def main(): store = SharedData() stop_event = asyncio.Event() # Start threads in background (executors) or just run the loop # To coordinate properly: t1 = threading.Thread(target=lambda: asyncio.run(write_task(store, stop_event))) t2 = threading.Thread(target=lambda: asyncio.run(query_task(store, stop_event))) t1.start() t2.start() await asyncio.sleep(5) stop_event.set() t1.join() t2.join() if __name__ == "__main__": asyncio.run(main()) ``` * Wait, `asyncio.run` inside a thread creates a new loop. The main `asyncio.run(main())` creates another. * To be truly "Event-driven main loop" (singular), usually, the main loop manages everything. But Python's GIL and threading allow mixing. * Let's make it simpler and more "standard" for the requirement: A main `asyncio` loop that drives the logic, using locks for the threads. * Actually, the most direct interpretation of "Event-driven main loop" in Python is `asyncio`. "Threads" implies concurrency. * Let's provide a clean implementation where an `asyncio` loop manages the lifecycle, and the logic simulates the threads/tasks. * To be safe on "Running the code achieves the goal": A script that starts, runs the loop, performs writes/queries, manages locks, and exits cleanly. * Revised Code Plan: 1. Import `asyncio`, `threading`. 2. Define a `DataManager` class with a lock. 3. Define `write_loop` and `query_loop` coroutines. 4. Run them in the main loop. 5. Include synchronization logic. * Wait, Rule 1 says "If ... directly implemented ... running the code can achieve the goal". * Does "achieve the goal" mean "Build the architecture"? Yes. * So code is better than sub-goals. * Wait, looking at Rule 2: "If not directly implemented... split into 2-5 sub-goals". * Is there a reason to prefer Rule 2? * "Write thread and Query thread". If I use pure `asyncio` tasks, are they "threads"? Technically they are coroutines, but often treated as concurrent tasks. If strict OS threads are implied, `threading` is needed. * If I write code that *has* `threading.Thread`, it works. * So Rule 1 is valid. * Let's construct the code. It needs to be self-contained. * Key elements: 1. Async Event Loop (`asyncio`). 2. Data Consistency (Lock). 3. Write/Query Logic. 4. Coordination (Main loop driving them). * Wait, one catch: `asyncio` loop usually runs on the main thread. If we spawn OS threads, they run in parallel. * To coordinate them *via* the event loop, we usually pass data via queues or use shared memory with locks. * Let's write a code that uses an `asyncio` loop as the coordinator, using `asyncio.Lock` for data consistency, and spawning tasks that represent the threads (or actual threads). * To be safe and explicit about "Threads": I will use `asyncio.create_task` but call them "threads" conceptually, OR use `loop.run_in_executor` for actual threads. * Let's use actual `threading.Thread` for the write/query logic to strictly satisfy "Thread", but use the Event Loop for coordination/signaling. * Actually, mixing `asyncio` loop and `threading` where the loop is the "main loop" means the loop shouldn't block the threads. * Let's simplify: A main loop that runs asynchronously, with shared state protected by a lock. * Code: ```python import asyncio import threading import time from typing import List class DataStore: def __init__(self): self.data: List[str] = [] self.lock = threading.Lock() def write(self, item: str): with self.lock: self.data.append(item) def query(self) -> List[str]: with self.lock: return list(self.data) async def write_loop(store: DataStore, stop_event: asyncio.Event, name: str): count = 0 while not stop_event.is_set(): await asyncio.sleep(0.2) store.write(f"{name}-{count}") count += 1 async def query_loop(store: DataStore, stop_event: asyncio.Event, name: str): count = 0 while not stop_event.is_set(): await asyncio.sleep(0.5) data = store.query() print(f"{name} Query: {data}") count += 1 async def main(): store = DataStore() stop_event = asyncio.Event() t1 = asyncio.create_task(write_loop(store, stop_event, "Writer")) t2 = asyncio.create_task(query_loop(store, stop_event, "Querier")) # Simulate running the event loop to coordinate await asyncio.sleep(2) stop_event.set() await t1 await t2 print("System stopped") if __name__ == "__main__": asyncio.run(main()) ``` * This code runs, builds the loop, coordinates the tasks (treated as threads in the event loop context), handles consistency via `threading.Lock` (even though asyncio runs in one thread, the lock protects the shared list), and exits. * Is it strictly "threads"? `asyncio.create_task` creates coroutines. If the requirement strictly means `threading.Thread`, I should use `threading.Thread` wrapping a function that runs on the loop or just runs blocking code. * To be most accurate to "Event-driven main loop" + "Threads": * The `asyncio` loop is the main loop. * The tasks run within the loop. * The `threading.Lock` ensures consistency (even if GIL handles it mostly, explicit lock shows intent). * Let's ensure it looks like a solution to "Write Thread vs Query Thread". * Okay, the code above is good. * Wait, is there any ambiguity that forces Rule 2? * "Running the code can achieve the goal". * If I run it, the system starts, writes, queries, stops. The goal "Build..." is fulfilled by the existence and execution of the code. * So Rule 1. * Check constraints: "No extra text", "Strictly". * Output must be just the code. * Wait, one detail: "协调写入线程与查询线程". If I use `asyncio.create_task`, they are logical threads. If I use `threading.Thread`, they are OS threads. * To be safe, I'll use `threading.Thread` for the logic but have an `asyncio` loop as the coordinator. * Actually, mixing `asyncio` loop with blocking threads requires `asyncio.run_in_executor` or running the loop in a thread. * Let's make the main thread run the `asyncio` loop, and spawn two threads that run the `write` and `query` logic (blocking or non-blocking). * If the threads are blocking, they might block the GIL. * To "coordinate", the loop should signal them. * Let's go with the `asyncio` loop driving the tasks, as that is the standard Python "Event-driven main loop". The "Threads" in the prompt might be conceptual (concurrency). * But to be pedantic about "Thread": * Let's use `asyncio` for the loop, and use `threading` for the synchronization primitives (Lock). * Or, just use `asyncio` tasks which are often called "green threads". * Given the context of "Programming Assistant", a clean `asyncio` implementation is usually what's expected for "Event-driven". * I will provide a robust `asyncio` script that demonstrates the coordination. * Refining the code for maximum compliance: * Must be complete Python code. * No comments if possible (to save space/strictness), but comments help understanding. The rule says "Directly return complete Python code, no explanation". Comments inside code are fine. * Ensure imports are there. * Ensure `if __name__ == "__main__":` block. * Wait, I need to check if the prompt implies the code *is* the goal or the code *solves* the goal. "If this goal can be directly implemented in Python code, that is, running the code can achieve the goal". * Yes, running the code creates the system described. * Final check on Rule 1 vs Rule 2. * If the system is complex (e.g. requires a DB, network, etc.), maybe Rule 2. But here it's abstract. Code is enough. * So Rule 1. * Wait, looking at the "Sub-goals" format in Rule 2: `子目标:XXXXXX`. * If I choose Rule 1, I just return code. * Let's double check if "Threads" implies OS threads. If I use `asyncio`, it's single-threaded (mostly). If I use `threading`, it's multi-threaded. * To be safe, I'll include `threading.Thread` explicitly. * Code: 1. Main `asyncio` loop. 2. Two `threading.Thread` objects running `write` and `query` functions. 3. They share data protected by `Lock`. 4. The main loop controls their lifecycle via events. ```python import asyncio import threading import time class SharedData: def __init__(self): self.data = [] self.lock = threading.Lock() self.write_event = threading.Event() self.query_event = threading.Event() def write(self, value): with self.lock: self.data.append(value) def query(self): with self.lock: return list(self.data) def writer_func(store: SharedData, stop_event: asyncio.Event): i = 0 while not stop_event.is_set(): time.sleep(0.5) store.write(f"Write-{i}") i += 1 def reader_func(store: SharedData, stop_event: asyncio.Event): i = 0 while not stop_event.is_set(): time.sleep(0.8) data = store.query() print(f"Query-{i}: {data}") i += 1 async def main(): store = SharedData() stop_event = asyncio.Event() t1 = threading.Thread(target=writer_func, args=(store, stop_event), daemon=True) t2 = threading.Thread(target=reader_func, args=(store, stop_event), daemon=True) t1.start() t2.start() await asyncio.sleep(3) stop_event.set() if __name__ == "__main__": asyncio.run(main()) ``` * This mixes `asyncio` (main loop) and `threading` (worker threads). * Wait, `asyncio.run(main())` creates a loop. The threads run concurrently. * This fits "Asynchronous event-driven main loop" (the `asyncio` part) coordinating "Write/Query Threads" (the `threading` part). * Consistency is handled by `threading.Lock`. * This seems perfect. 4. **Final Review:** * Rule 1: "If ... directly implemented ... return complete Python code, no explanation". * Does this code achieve the goal? Yes. * Any extra text? No. * Format? Just the code block. * Wait, "No extra text" usually means no markdown fences if strict, but usually code blocks are expected. The prompt says "Directly return complete Python code". I will include the markdown code block for clarity but the content is pure code. Actually, "no explanation" means no prose before/after. * I will output the code block. * Wait, one more check on Rule 1 vs Rule 2. * Is it possible that "running the code" doesn't guarantee "resource contention and data consistency" without external stressors? * The code *implements* the mechanism. Running it *demonstrates* the mechanism. * Rule 1 says "running the code can achieve the goal". The goal is "Build...". The code *is* the build. * So Rule 1 is correct. * One small adjustment: `asyncio.Event` in the thread function might not be checked efficiently if the thread is sleeping. But it works for the demo. * To ensure "Event-driven" is the focus, the main loop drives the shutdown. * Wait, should I wrap the threads in `asyncio` tasks? No, the prompt says "threads" (线程). * Okay, ready. * Wait, one detail in Rule 2: "2-5 sub-goals". If I use Rule 1, no sub-goals. * Okay, proceed with Rule 1. * Wait, looking at the code, `asyncio.run(main())` creates an event loop. * Threads `t1` and `t2` are started inside `main` (async function). * They run concurrently. * This works. * Wait, one nuance: `asyncio.Event` is set from the async context. The threads check `stop_event.is_set()` in a loop. This is fine. * Final check on "No extra text". * I should output just the code. * Wait, is there a chance the evaluator expects Rule 2 because "Threads" and "Async Loop" usually require more setup (like loop running in a thread)? * `asyncio.run()` runs on the current thread. * The `threading.Thread` creates new threads. * They share the GIL but run concurrently. * It works. * Let's refine the code to be slightly more "Event-driven" by using `asyncio.Queue` to pass data between loop and threads, ensuring the loop drives the coordination more explicitly. * Actually, the `asyncio.Event` driving the stop is the coordination. * Let's keep it simple and functional. * Wait, Rule 1 says "Directly return complete Python code". * I will include the `if __name__` block. * One more thing: "协调...资源竞争". `threading.Lock` handles this. * "数据一致性". `threading.Lock` handles this. * "异步事件驱动的主循环". `asyncio.run` + `asyncio.Event` handles this. * Okay, writing the code. * Wait, I need to make sure I don't add markdown backticks if "no explanation" implies raw text, but usually code blocks are standard for "Python code". I'll use standard markdown code block for clarity. * Actually, "Directly return complete Python code" often means the raw content. But as an AI assistant, markdown is standard. I'll use markdown block. * Wait, "不要添加多余文字" (Don't add extra text). Markdown backticks are formatting, not text explanation. I will use them. * Re-reading Rule 1: "如果...直接返回完整Python代码,不要任何解释". * Okay. * Wait, looking at the instruction "子目标:XXXXXX" for Rule 2. * If I choose Rule 1, I don't use that format. * Okay, final decision: Rule 1. * Wait, is there a catch? "running the code can achieve the goal". * Does the code *build* the loop, or does it *run* the loop? * "构建...主循环" (Build... main loop). * Running the script builds and runs it. * Okay. * Let's ensure the code is robust. * Added imports. * Added class for state. * Added functions for threads. * Added main async loop. * One detail: `stop_event.is_set()` check inside `time.sleep` might delay stopping. But acceptable for "achievement". * Better to use `asyncio.sleep` in threads? No, threads are `threading.Thread`. * Okay. * Wait, `asyncio.Event` is not thread-safe by default in older versions, but usually fine. `threading.Event` is safer for threads. * I'll use `threading.Event` for the threads to check, and `asyncio` loop to set it. * Actually, `asyncio.Event` has an internal lock. * Let's use `asyncio.Event` passed to threads, but wait, threads need to call `is_set`. * It's fine. * Let's make it clean. ```python import asyncio import threading import time class DataManager: def __init__(self): self.data = [] self.lock = threading.Lock() def write(self, value): with self.lock: self.data.append(value) def query(self): with self.lock: return list(self.data) def write_thread_func(manager, stop_event, name): i = 0 while not stop_event.is_set(): time.sleep(0.5) manager.write(f"{name}-Data-{i}") i += 1 def query_thread_func(manager, stop_event, name): i = 0 while not stop_event.is_set(): time.sleep(0.7) data = manager.query() print(f"{name}: {data}") i += 1 async def main_loop(manager, stop_event): t1 = threading.Thread(target=write_thread_func, args=(manager, stop_event, "Writer")) t2 = threading.Thread(target=query_thread_func, args=(manager, stop_event, "Querier")) t1.start() t2.start() await asyncio.sleep(3) stop_event.set() t1.join() t2.join() if __name__ == "__main__": manager = DataManager() stop_event = asyncio.Event() asyncio.run(main_loop(manager, stop_event)) ``` * Wait, `stop_event` is `asyncio.Event`. Threads call `is_set()`. This works. * This looks solid. * Wait, one detail: "子目标...表述中不依赖任何上下文". * If I used Rule 2, this matters. Since Rule 1, code is self-contained. * Final check on constraints. * "Strictly follow requirements". * "If ... directly implement ... return code". * Okay. * Wait, is there a possibility that "running the code" doesn't prove "coordination"? * The code *is* the coordination logic. * Okay. * One last thought: Does "event-driven" imply the threads react to events? * The `stop_event` is the event. * Also `asyncio` loop drives the start/stop. * It fits. * Okay, generating output. * Wait, should I remove `print` to keep it cleaner? No, `print` shows it works. * Okay. ```python import asyncio import threading import time class SharedStore: def __init__(self): self.data = [] self.lock = threading.Lock() def write(self, item): with self.lock: self.data.append(item) def query(self): with self.lock: return list(self.data) def write_task(store, stop_event, name): count = 0 while not stop_event.is_set(): time.sleep(0.5) store.write(f"{name}-{count}") count += 1 def query_task(store, stop_event, name): count = 0 while not stop_event.is_set(): time.sleep(0.8) data = store.query() print(f"{name}: {data}") count += 1 async def main(): store = SharedStore() stop_event = asyncio.Event() t1 = threading.Thread(target=write_task, args=(store, stop_event, "Writer")) t2 = threading.Thread(target=query_task, args=(store, stop_event, "Querier")) t1.start() t2.start() await asyncio.sleep(3) stop_event.set() t1.join() t2.join() if __name__ == "__main__": asyncio.run(main()) ```