diff --git a/seq-aiohttp/main.py b/seq-aiohttp/main.py index 7b504ec..72655d1 100644 --- a/seq-aiohttp/main.py +++ b/seq-aiohttp/main.py @@ -1,56 +1,35 @@ -from aiohttp import web -import asyncio -from scramjet.streams import Stream -from random import randint +import aiohttp import nest_asyncio -# from gpiozero import CPUTemperature, DiskUsage, LoadAverage, PingServer -import functools - -connected = set() - -requires = { - 'requires': 'pi', - 'contentType': 'text/plain' -} +from aiohttp import web +from client.host_client import HostClient -async def root(request): - return web.Response(text="working..") -async def serve(request): - return web.FileResponse('index.html') +api_base ='http://127.0.0.1:8000' +host = HostClient(f'{api_base}/api/v1/') +topic_gen = host.get_named_data('pi') -async def websocket_handler(request, input): +async def handler(request): ws = web.WebSocketResponse() - connected.add(ws) await ws.prepare(request) - async for msg in ws: - for connection in connected: - if msg.type == web.WSMsgType.TEXT: - if msg.data == 'close': - await ws.close() - else: - await connection.send_str(f'ok, data from topic:{await get_from_topic(input)}') - elif msg.type == web.WSMsgType.ERROR: - print(f'ws connection closed with exception {ws.exception()}') + async for chunk in await topic_gen: + await ws.send_str(str(chunk)) + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + if msg.data == 'close': + await ws.close() + else: + await ws.send_str(f'{msg.data} /answer') + elif msg.type == aiohttp.WSMsgType.ERROR: + print(f'ws connection closed with exception {ws.exception()}') print('websocket connection closed') return ws - - -bound_handler = functools.partial(websocket_handler, input='input') - -async def get_from_topic(input): - await asyncio.sleep(1) - topic_data = input.map(lambda s: f'consumer got: {s}').each(print) - return topic_data async def run(context, input): nest_asyncio.apply() app = web.Application() - app.add_routes([web.get('/', root)]) - app.add_routes([web.get('/ws', bound_handler)]) app.add_routes([web.static('/files', './', show_index=True)]) - # web.run_app(app) - asyncio.gather(web.run_app(app), return_exceptions=True) + app.add_routes([web.get('/ws', handler)]) + await web.run_app(app, port=8020) diff --git a/seq-aiohttp/main2_no_seq.py b/seq-aiohttp/main2_no_seq.py new file mode 100644 index 0000000..c8f9d2f --- /dev/null +++ b/seq-aiohttp/main2_no_seq.py @@ -0,0 +1,51 @@ +from aiohttp import web +import asyncio +import functools +from random import randint +from pyee.asyncio import AsyncIOEventEmitter + + +# ee = EventEmitter() +ee = AsyncIOEventEmitter() + +requires = { + 'requires': 'pi', + 'contentType': 'text/plain' +} + +@ee.on('event') +async def event_handler(ws, data): + print(f'TRIGGERED, {data}, {ws}') + await ws.send_str(f'ok, data {data}') + +async def push_to_socket(socket, data): + await socket.send_str(f'pushed: {data}') + +async def root(request): + return web.Response(text="working..") + +async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + print('connected') + await mock_from_topic(ws=ws) + async for msg in ws: + # ws.disconnect() # any messages here should kill the connection + print("websocket connection started sending data we dont accept") + print('websocket connection closed') + return ws + +async def mock_from_topic(ws): + while True: + ee.emit('event', ws, randint(0,20)) + await asyncio.sleep(randint(0,4)) + +app = web.Application() +app.add_routes([web.get('/', root)]) +bound_handler = functools.partial(websocket_handler) +app.add_routes([web.get('/ws', bound_handler)]) +app.add_routes([web.static('/files','./', show_index=True)]) +web.run_app(app) + + + diff --git a/seq-aiohttp/requirements.txt b/seq-aiohttp/requirements.txt index 4973af7..919651b 100644 --- a/seq-aiohttp/requirements.txt +++ b/seq-aiohttp/requirements.txt @@ -1,2 +1,3 @@ scramjet-framework-py==0.10 -pyee==9.0.4 \ No newline at end of file +pyee==9.0.4 +aiohttp==3.8.4 diff --git a/seq-rpi-internal/raspberry.py b/seq-rpi-internal/raspberry.py index cf5eb2f..af90e9a 100644 --- a/seq-rpi-internal/raspberry.py +++ b/seq-rpi-internal/raspberry.py @@ -31,7 +31,7 @@ async def set_internals(stream, interval=3, mock=mock): async def run(context, input): stream = Stream() asyncio.gather(set_internals(stream), return_exceptions=True) - return stream.map(lambda x : x + "\n") + return stream.map(lambda x : str(x) + "\n")