Memory efficient message chunk processing using a XMLHttpRequest(使用 XMLHttpRequest 的内存高效消息块处理)
问题描述
我有一个 XMLHttpRequest 和一个 progress 事件处理程序,它正在请求一个连续发送添加消息块的分块页面.如果我没有设置 responseType,我可以在每个 progress 事件中访问 XMLHttpRequest 的 response 属性,并且处理额外的消息块.这种方法的问题是浏览器必须将整个响应保存在内存中,最终浏览器会因为这种内存浪费而崩溃.
I have a XMLHttpRequest with a progress event handler that is requesting a chunked page which continuously sends adds message chunks. If I do not set a responseType, I can access the response property of the XMLHttpRequest in each progress event and handle the additional message chunk. The problem of this approach is that the browser must keep the entire response in memory, and eventually, the browser will crash due to this memory waste.
所以,我尝试了一个arraybuffer的responseType,希望可以对缓冲区进行切片,防止之前的内存浪费过多.不幸的是,此时 progress 事件处理程序不再能够读取 XMLHttpRequest 的 response 属性.progress 事件的事件参数也不包含缓冲区.这是我尝试的一个简短的、独立的示例(这是为 node.js 编写的):
So, I tried a responseType of arraybuffer in the hope that I can slice the buffer to prevent the previous excessive memory waste. Unfortunately, the progress event handler is no longer capable of reading the response property of the XMLHttpRequest at this point. The event parameter of the progress event does not contain the buffer, either. Here is a short, self-contained example of my attempt at this (this is written for node.js):
var http = require('http');
// -- The server.
http.createServer(function(req, res) {
if (req.url === '/stream') return serverStream(res);
serverMain(res);
}).listen(3000);
// -- The server functions to send a HTML page with the client code, or a stream.
function serverMain(res) {
res.writeHead(200, {'Content-Type': 'text/html'});
res.write('<html><body>Hello World</body><script>');
res.end(client.toString() + ';client();</script></html>');
}
function serverStream(res) {
res.writeHead(200, {'Content-Type': 'text/html'});
setInterval(function() {
res.write('Hello World<br />
');
}, 1000);
}
// -- The client code which runs in the browser.
function client() {
var xhr = new XMLHttpRequest();
xhr.addEventListener('progress', function() {
if (!xhr.response) return console.log('progress without response :-(');
console.log('progress: ' + xhr.response.size);
}, false);
xhr.open('GET', '/stream', true);
xhr.responseType = 'arraybuffer';
xhr.send();
}
progress 事件处理程序无法访问我想要的 response.如何以节省内存的方式处理浏览器中的消息块?请不要建议 WebSocket.我不希望只使用一个来处理消息块的只读流.
The progress event handler has no access to the response I wanted. How can I handle the message chunks in the browser in a memory-efficient way? Please do not suggest a WebSocket. I do not wish to use one just to process a read-only stream of message chunks.
推荐答案
XMLHttpRequest 似乎并不是真正为这种用法而设计的.显而易见的解决方案是轮询,这是 XMLHttpRequest 的一种流行用法,但我猜你不想错过流中的数据,这些数据会在调用之间滑动.
XMLHttpRequest doesn't seem really designed for this kind of usage. The obvious solution is polling, which is a popular use of XMLHttpRequest but I'm guessing you don't want to miss data from your stream that would slip between the calls.
对于我的问题可以以某种方式识别真实"数据块还是基本上是随机数据?,您回答了 通过一些努力,可以通过向服务器端添加各种事件 ID 来识别块一个>
To my question Can the "real" data chunks be identified in some way or is it basically random data ?, you answered With some effort, the chunks could be identified by adding an event-id of sorts to the server-side
基于这个前提,我提出:
Based on this premise, I propose:
- 连接流并设置进度监听器(简称
listenerA()). - 当一个块到达时,处理它并输出它.保留对
listenerA()收到的第一个和最后一个块的 id 的引用.计算listenerA()收到了多少块. listenerA()收到一定数量的块后,生成另一个线程"(连接 + 侦听器,listenerB())执行步骤 1 和 2与第一个并行,但将处理后的数据保存在缓冲区中而不是输出.- 当
listenerA()接收到与listenerB()接收到的第一个chunk id相同的chunk时,发送一个信号给listenerB(),断开第一个连接并终止listenerA(). - 当
listenerB()接收到来自listenerA()的终止信号时,将缓冲区转储到输出,继续正常处理. - 让
listenerB()在与之前相同的条件下生成listenerC(). - 根据需要不断重复使用尽可能多的连接和侦听器.
- Connect to the stream and set up the progress listener (referred to as
listenerA()). - When a chunk arrives, process it and output it. Keep a reference to the ids of both the first and last chunk received by
listenerA(). Count how many chunkslistenerA()has received. - After
listenerA()has received a certain amount of chunks, spawn another "thread" (connection + listener,listenerB()) doing the steps 1 and 2 in parallel to the first one but keep the processed data in a buffer instead of outputting it. - When
listenerA()receives the chunk with the same id as the first chunk received bylistenerB(), send a signal tolistenerB(), drop the first connection and killlistenerA(). - When
listenerB()receives the termination signal from thelistenerA(), dump the buffer to the output and keep processing normally. - Have
listenerB()spawnlistenerC()on the same conditions as before. - Keep repeating with as many connections + listeners as necessary.
通过使用两个重叠的连接,您可以防止由于断开单个连接然后重新连接而可能导致的块丢失.
By using two overlapping connections, you can prevent the possible loss of chunks that would result from dropping a single connection and then reconnecting.
- 这假设所有连接的数据流都是相同的,并且不引入一些个性化设置.
- 根据流的输出速率和连接延迟,从一个连接转换到另一个连接期间的缓冲区转储可能会很明显.
- 您还可以衡量总响应大小而不是块数来决定何时切换到新连接.
- 可能需要保留一个完整的块 id 列表以进行比较,而不仅仅是第一个和最后一个,因为我们无法保证重叠的时间.
XMLHttpRequest的responseType必须设置为其默认值""或"text",才能返回文本.其他数据类型不会返回部分response.请参阅 https://xhr.spec.whatwg.org/#the-response-attribute
- This assumes the data stream is the same for all connections and doesn't introduce some individualized settings.
- Depending on the output rate of the stream and the connection delay, the buffer dump during the transition from one connection to another might be noticeable.
- You could also measure the total response size rather than the chunks count to decide when to switch to a new connection.
- It might be necessary to keep a complete list of chunks ids to compare against rather than just the first and last one because we can't guarantee the timing of the overlap.
- The
responseTypeofXMLHttpRequestmust be set to its default value of""or "text", to return text. Other datatypes will not return a partialresponse. See https://xhr.spec.whatwg.org/#the-response-attribute
以下代码是一个 node.js 服务器,它输出一致的元素流以用于测试目的.您可以打开到它的多个连接,输出将是相同的 accross 会话,减去可能的服务器延迟.
The following code is a node.js server that outputs a consistent stream of elements for testing purposes. You can open multiple connections to it, the output will be the same accross sessions, minus possible server lag.
http://localhost:5500/stream
将返回 id 为递增数字的数据
will return data where id is an incremented number
http://localhost:5500/streamRandom
将返回数据,其中 id 是一个随机的 40 个字符长的字符串.这是为了测试一个不能依赖 id 来排序数据的场景.
will return data where id is a random 40 characters long string. This is meant to test a scenario where the id can not be relied upon for ordering the data.
var crypto = require('crypto');
// init + update nodeId
var nodeId = 0;
var nodeIdRand = '0000000000000000000000000000000000000000';
setInterval(function() {
// regular id
++nodeId;
//random id
nodeIdRand = crypto.createHash('sha1').update(nodeId.toString()).digest('hex');
}, 1000);
// create server (port 5500)
var http = require('http');
http.createServer(function(req, res) {
if(req.url === '/stream') {
return serverStream(res);
}
else if(req.url === '/streamRandom') {
return serverStream(res, true);
}
}).listen(5500);
// serve nodeId
function serverStream(res, rand) {
// headers
res.writeHead(200, {
'Content-Type' : 'text/plain',
'Access-Control-Allow-Origin' : '*',
});
// remember last served id
var last = null;
// output interval
setInterval(function() {
// output on new node
if(last != nodeId) {
res.write('[node id="'+(rand ? nodeIdRand : nodeId)+'"]');
last = nodeId;
}
}, 250);
}
概念证明,使用上述 node.js 服务器代码
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
</head>
<body>
<button id="stop">stop</button>
<div id="output"></div>
<script>
/*
Listening to a never ending page load (http stream) without running out of
memory by using concurrent overlapping connections to prevent loss of data,
using only xmlHttpRequest, under the condition that the data can be identified.
listen arguments
url url of the http stream
chunkMax number of chunks to receive before switching to new connection
listen properties
output a reference to a DOM element with id "output"
queue an array filled with non-duplicate received chunks and metadata
lastFetcherId an incrementing number used to assign an id to new fetchers
fetchers an array listing all active fetchers
listen methods
fire internal use fire an event
stop external use stop all connections
fetch internal use starts a new connection
fetchRun internal use initialize a new fetcher object
Usage
var myListen = new listen('http://localhost:5500/streamRandom', 20);
will listen to url "http://localhost:5500/streamRandom"
will switch connections every 20 chunks
myListen.stop()
will stop all connections in myListen
*/
function listen(url, chunkMax) {
// main ref
var that = this;
// output element
that.output = document.getElementById('output');
// main queue
that.queue = [];
// last fetcher id
that.lastFetcherId = 0;
// list of fetchers
that.fetchers = [];
//********************************************************* event dispatcher
that.fire = function(name, data) {
document.dispatchEvent(new CustomEvent(name, {'detail':data}));
}
//******************************************************** kill all fetchers
that.stop = function() {
that.fire('fetch-kill', -1);
}
//************************************************************** url fetcher
that.fetch = function(fetchId, url, fetchRef) {
//console.log('start fetcher #'+fetchId);
var len = 0;
var xhr = new XMLHttpRequest();
var cb_progress;
var cb_kill;
// progress listener
xhr.addEventListener('progress', cb_progress = function(e) {
// extract chunk data
var chunkData = xhr.response.substr(len);
// chunk id
var chunkId = chunkData.match(/id="([a-z0-9]+)"/)[1];
// update response end point
len = xhr.response.length;
// signal end of chunk processing
that.fire('chunk-ready', {
'fetchId' : fetchId,
'fetchRef' : fetchRef,
'chunkId' : chunkId,
'chunkData' : chunkData,
});
}, false);
// kill switch
document.addEventListener('fetch-kill', cb_kill = function(e) {
// kill this fetcher or all fetchers (-1)
if(e.detail == fetchId || e.detail == -1) {
//console.log('kill fetcher #'+fetchId);
xhr.removeEventListener('progress', cb_progress);
document.removeEventListener('fetch-kill', cb_kill);
xhr.abort();
that.fetchers.shift(); // remove oldest fetcher from list
xhr = null;
delete xhr;
}
}, false);
// go
xhr.open('GET', url, true);
xhr.responseType = 'text';
xhr.send();
};
//****************************************************** start a new fetcher
that.fetchRun = function() {
// new id
var id = ++that.lastFetcherId;
//console.log('create fetcher #'+id);
// create fetcher with new id
var fetchRef = {
'id' : id, // self id
'queue' : [], // internal queue
'chunksIds' : [], // retrieved ids, also used to count
'hasSuccessor' : false, // keep track of next fetcher spawn
'ignoreId' : null, // when set, ignore chunks until this id is received (this id included)
};
that.fetchers.push(fetchRef);
// run fetcher
that.fetch(id, url, fetchRef);
};
//************************************************ a fetcher returns a chunk
document.addEventListener('chunk-ready', function(e) {
// shorthand
var f = e.detail;
// ignore flag is not set, process chunk
if(f.fetchRef.ignoreId == null) {
// store chunk id
f.fetchRef.chunksIds.push(f.chunkId);
// create queue item
var queueItem = {'id':f.chunkId, 'data':f.chunkData};
// chunk is received from oldest fetcher
if(f.fetchId == that.fetchers[0].id) {
// send to main queue
that.queue.push(queueItem);
// signal queue insertion
that.fire('queue-new');
}
// not oldest fetcher
else {
// use fetcher internal queue
f.fetchRef.queue.push(queueItem);
}
}
// ignore flag is set, current chunk id the one to ignore
else if(f.fetchRef.ignoreId == f.chunkId) {
// disable ignore flag
f.fetchRef.ignoreId = null;
}
//******************** check chunks count for fetcher, threshold reached
if(f.fetchRef.chunksIds.length >= chunkMax && !f.fetchRef.hasSuccessor) {
// remember the spawn
f.fetchRef.hasSuccessor = true;
// spawn new fetcher
that.fetchRun();
}
/***********************************************************************
check if the first chunk of the second oldest fetcher exists in the
oldest fetcher.
If true, then they overlap and we can kill the oldest fetcher
***********************************************************************/
if(
// is this the oldest fetcher ?
f.fetchId == that.fetchers[0].id
// is there a successor ?
&& that.fetchers[1]
// has oldest fetcher received the first chunk of its successor ?
&& that.fetchers[0].chunksIds.indexOf(
that.fetchers[1].chunksIds[0]
) > -1
) {
// get index of last chunk of the oldest fetcher within successor queue
var lastChunkId = that.fetchers[0].chunksIds[that.fetchers[0].chunksIds.length-1]
var lastChunkIndex = that.fetchers[1].chunksIds.indexOf(lastChunkId);
// successor has not reached its parent last chunk
if(lastChunkIndex < 0) {
// discard whole queue
that.fetchers[1].queue = [];
that.fetchers[1].chunksIds = [];
// set ignore id in successor to future discard duplicates
that.fetchers[1].ignoreId = lastChunkId;
}
// there is overlap
else {
/**
console.log('triming queue start: '+that.fetchers[1].queue.length
+" "+(lastChunkIndex+1)
+" "+(that.fetchers[1].queue.length-1)
);
/**/
var trimStart = lastChunkIndex+1;
var trimEnd = that.fetchers[1].queue.length-1;
// trim queue
that.fetchers[1].queue = that.fetchers[1].queue.splice(trimStart, trimEnd);
that.fetchers[1].chunksIds = that.fetchers[1].chunksIds.splice(trimStart, trimEnd);
//console.log('triming queue end: '+that.fetchers[1].queue.length);
}
// kill oldest fetcher
that.fire('fetch-kill', that.fetchers[0].id);
}
}, false);
//***************************************************** main queue processor
document.addEventListener('queue-new', function(e) {
// process chunks in queue
while(that.queue.length > 0) {
// get chunk and remove from queue
var chunk = that.queue.shift();
// output item to document
if(that.output) {
that.output.innerHTML += "<br />"+chunk.data;
}
}
}, false);
//****************************************************** start first fetcher
that.fetchRun();
};
// run
var process = new listen('http://localhost:5500/streamRandom', 20);
// bind global kill switch to button
document.getElementById('stop').addEventListener('click', process.stop, false);
</script>
</body>
</html>
这篇关于使用 XMLHttpRequest 的内存高效消息块处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:使用 XMLHttpRequest 的内存高效消息块处理
基础教程推荐
- 如何使用sencha Touch2在单页中显示列表和其他标签 2022-01-01
- 为什么我在 Vue.js 中得到 ERR_CONNECTION_TIMED_OUT? 2022-01-01
- Node.js 有没有好的索引/搜索引擎? 2022-01-01
- 每次设置弹出窗口的焦点 2022-01-01
- 如何在特定日期之前获取消息? 2022-01-01
- jQuery File Upload - 如何识别所有文件何时上传 2022-01-01
- 什么是不使用 jQuery 的经验技术原因? 2022-01-01
- 如何使用 CSS 显示和隐藏 div? 2022-01-01
- WatchKit 支持 html 吗?有没有像 UIWebview 这样的控制器? 2022-01-01
- Javascript 在多个元素上单击事件侦听器并获取目标 2022-01-01
