English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

JavaScript多线程运行库Nexus.js详解

First, if you are not familiar with this project, it is recommended to read a series of articles written previously. If you don't want to read these, don't worry. This will also involve the content of those articles.

Now, let's get started.

Last year, I started implementing Nexus.js, which is based on Webkit/A multi-threaded server-side JavaScript runtime library for the JavaScript core. For a while, I abandoned this endeavor due to some reasons beyond my control, which I won't discuss here, mainly: I can't keep myself working for a long time.

So, let's start by discussing the architecture of Nexus and how it works.

Event loop

No event loop

There is a thread pool with (lock-free) task objects

Every call to setTimeout or setImmediate or creation of a Promise, the task will be queued in the task queue.

When scheduling a task, the first available thread will select the task and execute it.

Processes Promises on CPU cores. The call to Promise.all() will resolve Promises in parallel.

ES6

Supports async/await, and it is recommended to use

Supports for await(...)

Supports destructuring

Supports async try/catch/finally

Module

Does not support CommonJS. (require(...) and module.exports)

All modules use ES6of import/export syntax

Supports dynamic imports through import('file-or-packge

Supports import.meta, for example: import.meta.filename and import.meta.dirname, etc.

Additional feature: supports direct import from URL, for example:

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';

EventEmitter

Nexus implements an EventEmitter class based on Promise

The event handler is sorted across all threads and executed in parallel.

La valeur de retour de EventEmitter.emit(...) est un Promise, qui peut être résolu en tant qu'array constitué des valeurs retournées par les gestionnaires d'événements.

par exemple :

class EmitterTest extends Nexus.EventEmitter {
 constructor() {
  super();
  for(let i = 0; i < 4; i++)
   this.on('test', value => { console.log(`déclenché le test ${i} !`); console.inspect(value); });
  for(let i = 0; i < 4; i++)
   this.on('returns-a-value', v => `${v + i}`);
 }
}
const test = new EmitterTest();
async function start() {
 await test.emit('test', { payload: 'test 1});
 console.log('le premier test est terminé !');
 await test.emit('test', { payload: 'test 2});
 console.log('le deuxième test est terminé !');
 const values = await test.emit('returns-a-value', 10);
 console.log('le troisième test est terminé, les valeurs retournées sont :'); console.inspect(values);
}
start().catch(console.error);

I/O

tous les entrées/Les sorties sont réalisées par trois opérateurs : Device, Filter et Stream.

tous les entrées/Les opérateurs de sortie implémentent toutes la classe EventEmitter

Pour utiliser Device, vous devez créer un ReadableStream ou un WritableStream au-dessus de Device

Pour manipuler les données, vous pouvez ajouter des Filters à un ReadableStream ou un WritableStream.

Enfin, utilisez source.pipe(...destinationStreams) puis attendez source.resume() pour traiter les données.

tous les entrées/Les opérations de sortie sont toutes réalisées à l'aide d'objets ArrayBuffer.

Filter a essayé de traiter les données avec la méthode process(buffer).

par exemple : utiliser2des fichiers de sortie indépendants contiennent UTF-8convertir en UTF6。

const startTime = Date.now();
 try {
  const device = new Nexus.IO.FilePushDevice('enwik'8');}}
  const stream = new Nexus.IO.ReadableStream(device);
  stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
  const wstreams = [0,1,2,3]
   .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i)));
  console.log('piping...');
  stream.pipe(...wstreams);
  console.log('streaming...');
  await stream.resume();
  await stream.close();
  await Promise.all(wstreams.map(stream => stream.close()));
  console.log(`terminé en ${(Date.now * startTime) / 1000} secondes!`);
 } catch (e) {
  console.error('Une erreur s'est produite : ', e);
 }
}
start().catch(console.error);

TCP/UDP

Nexus.js fournit une classe Acceptor, responsable de lier l'adresse IP/Port et écoute de connexion

À chaque réception d'une demande de connexion, l'événement 'connection' est déclenché et fournit un appareil Socket.

Chaque instance de Socket est bidirectionnelle et/O appareil.

Vous pouvez utiliser ReadableStream et WritableStream pour manipuler le Socket.

Le plus simple exemple : (envoyer "Hello World" au client)

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;
acceptor.on('connection', (socket, endpoint) => {
 const connId = count++;
 console.log(`connexion #${connId} depuis ${endpoint.address}:${endpoint.port}`);
 const rstream = new Nexus.IO.ReadableStream(socket);
 const wstream = new Nexus.IO.WritableStream(socket);
 const buffer = new Uint8Array(13);
 const message = 'Hello World!\n';
 for(let i = 0; i < 13; i++)
  buffer[i] = message.charCodeAt(i);
 rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
 rstream.on('data', buffer => console.log(`message reçue: ${buffer}`));
 rstream.resume().catch(e => console.log(`client #${connId} à ${endpoint.address}:${endpoint.port} déconnecté!`));
 console.log(`envoi de la salutation à #${connId}!`);
 wstream.write(buffer);
});
acceptor.bind('127.0.0.1', 10000);
acceptor.listen();
console.log('server ready');

Http

Nexus fournit une classe Nexus.Net.HTTP.Server, qui hérite principalement de TCPAcceptor

Des interfaces de base

Lorsque le serveur a terminé l'analyse des en-têtes HTTP entrants de base/Lors de la vérification, l'événement connection sera déclenché par la connexion et les mêmes informations

Chaque instance de connexion a une requête et un objet de réponse. Ce sont les entrées/Appareil de sortie.

Vous pouvez construire des ReadableStream et WritableStream pour manipuler la requête/response。

Si vous connectez à un objet Response via un tuyau, le flux d'entrée utilisera le mode de codage par morceaux. Sinon, vous pouvez utiliser response.write() pour écrire une chaîne de caractères classique.

Exemple complexe : (un serveur HTTP de base avec le codage en bloc, détails omis)}

....
/**
 * Creates an input stream from a path.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
async function createInputStream(path) {
 if (path.startsWith('/)) // If it starts with '/', omit it.
  path = path.substr(1);
 if (path.startsWith('.')) // If it starts with '.', reject it.
  throw new NotFoundError(path);
 if (path === '/' || !path) // If it's empty, set to index.html.
  path = 'index.html';
 /**
  * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`.
  */
 const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path);
 try {
  // Stat the target path.
  const {type} = await Nexus.FileSystem.stat(filePath);
  if (type === Nexus.FileSystem.FileType.Directory) // If it's a directory, return its 'index.html'
   return createInputStream(Nexus.FileSystem.join(filePath, 'index.html'));
  else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
   // Si ce n'est pas trouvé, lancez NotFound.
   throw new NotFoundError(path);
 } catch(e) {
  if (e.code)
   throw e;
  throw new NotFoundError(path);
 }
 try {
  // Tout d'abord, nous créons un appareil.
  const fileDevice = new Nexus.IO.FilePushDevice(filePath);
  // Ensuite, nous renvoyons un nouveau Nexus.IO.ReadableStream créé à l'aide de notre appareil source.
  return new Nexus.IO.ReadableStream(fileDevice);
 } catch(e) {
  throw new InternalServerError(e.message);
 }
}
/**
 * Compteur des connexions.
 */
let connections = 0;
/**
 * Créer un nouveau serveur HTTP.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();
// Une erreur serveur signifie qu'une erreur s'est produite pendant que le serveur écoutait les connexions.
// Nous pouvons généralement ignorer de tels erreurs, nous les affichons malgré tout.
server.on('error', e => {
 console.error(FgRed + Bright + 'Erreur serveur : ' + e.message + '\n' + e.stack, Reset);
});
/**
 * Écouter les connexions.
 */
server.on('connection', async (connection, peer) => {
 // Commencer avec un ID de connexion de 0, incrémenter à chaque nouvelle connexion.
 const connId = connections++;
 // Enregistrer l'heure de début pour cette connexion.
 const startTime = Date.now();
 // Le déstructuration est pris en charge, pourquoi ne pas l'utiliser?
 const { request, response } = connection;
 // Analyser les parties de l'URL.
 const { path } = parseURL(request.url);
 // Ici, nous stockerons toutes les erreurs qui se produisent pendant la connexion.
 const errors = [];
 // inStream est notre source de fichier ReadableStream, outStream est notre réponse (appareil) enveloppé dans un WritableStream.
 let inStream, outStream;
 try {
  // Enregistrer la requête.
  console.log(`> #${FgCyan` + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
  // Définir l'en-tête 'Server'.
  response.set('Server', `nexus.js`)/0.1.1`);
  // Créez notre flux d'entrée.
  inStream = await createInputStream(path);
  // Créez notre flux de sortie.
  outStream = new Nexus.IO.WritableStream(response);
  // Hookez tous les événements `error`, ajoutez tous les erreurs à notre tableau `errors`.
  inStream.on('error', e => { errors.push(e); });
  request.on('error', e => { errors.push(e); });
  response.on('error', e => { errors.push(e); });
  outStream.on('error', e => { errors.push(e); });
  // Définir le type de contenu et le statut de la requête.
  response
   .set('Content-Type', mimeType(path))
   .status(200);
  // Connectez l'entrée à la sortie(s).
  const disconnect = inStream.pipe(outStream);
  try {
   // Reprenez notre flux de fichier, cela cause le flux à passer au codage encodé en chunks HTTP.
   // Cela retournera une promesse qui ne sera résolue qu'après que le dernier octet (chunk HTTP) ait été écrit.
   await inStream.resume();
  } catch (e) {
   // Capturez toutes les erreurs qui se produisent pendant le streaming.
   errors.push(e);
  }
  // Déconnectez tous les callbacks créés par `.pipe()`.
  return disconnect();
 } catch(e) {
  // Si une erreur s'est produite, l'ajoutez à l'array.
  errors.push(e);
  // Définissez le type de contenu, le statut et écrivez un message de base.
  response
   .set('Content-Type', 'text/plain')
   .status(e.code || 500)
   .send(e.message || 'Une erreur s'est produite.');
 } finally {
  // Fermez les flux manuellement. C'est important car nous pourrions manquer de gestionnaires de fichiers autrement.
  if (inStream)
   await inStream.close();
  if (outStream)
   await outStream.close();
  // Fermez la connexion, n'a pas d'effet réel avec keep-connexions actives.
  await connection.close();
  // Grab the response's status.
  let status = response.status();
  // Determine what colour to output to the terminal.
  const statusColors = {
   '200': Bright + FgGreen, // Green for 200 (OK),
   '404': Bright + FgYellow, // Yellow for 404 (Not Found)
   '500': Bright + FgRed // Red for 500 (Internal Server Error)
  };
  let statusColor = statusColors[status];
  if (statusColor)
   status = statusColor + status + Reset;
  // Log the connection (and time to complete) to the console.
  console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
   FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
   (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset));
 }
});
/**
 * IP and port to listen on.
 */
const ip = '0.0.0.0', port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Lier l'adresse et le port sélectionnés.
 */
server.bind(ip, port, portReuse);
/**
 * Commencer à écouter les requêtes.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js serveur HTTP en écoute sur ${ip}:${port}` + Réinitialiser);

Benchmark

Je pense que j'ai couvert tout ce qui a été réalisé jusqu'à présent. Alors parlons maintenant de la performance.

Voici le benchmark actuel de ce serveur HTTP, avec100 connexions concurrentes et au total10000 requêtes :

Ceci est ApacheBench, Version 2.3 <$Révision : 1796539 $>
Droit d'auteur 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licencié à la Fondation Apache, http://www.apache.org/
Benchmarking localhost (patience).....terminé
Logiciel du serveur :    nexus.js/0.1.1
Nom d'hôte du serveur :    localhost
Port du serveur :      3000
Chemin du document :     /
Longueur du document :    8673 octets
Niveau de concurrence :   100
Temps pris pour les tests :  9.991 secondes
Requêtes complètes :   10000
Requêtes échouées :    0
Total transféré :   87880000 octets
HTML transféré :    86730000 octets
Requêtes par seconde :  1000.94 [#/sec] (moyenne)
Temps par requête :    99.906 [ms] (moyenne)
Temps par requête :    0.999 [ms] (moyenne, sur toutes les demandes concurrentes)
Taux de transfert:     8590.14 [Kbytes/sec] reçu
Temps de connexion (ms)
       min moyenne[+/-sd] médiane  max
Connect:    0  0  0.1   0    1
En traitement:   6  99 36.6   84   464
En attente:    5  99 36.4   84   463
Total:     6 100 36.6   84   464
Pourcentage des demandes servies dans un certain temps (ms)
 50%   84
 66%   97
 75%  105
 80%  112
 90%  134
 95%  188
 98%  233
 99%  238
 100%  464 (demande la plus longue)

par seconde1000 demandes.7En haut, dessus s'exécute un certain logiciel de test de base, un qui occupe5G mémoire IDE, ainsi que le serveur lui-même.

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor  : 0
vendor_id  : GenuineIntel
cpu family : 6
model    : 60
model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping  : 3
microcode  : 0x22
cpu MHz   : 3392.093
cache size : 8192 KB
physical id : 0
siblings  : 8
core id   : 0
cpu cores  : 4
apicid   : 0
initial apicid : 0
fpu   : yes
fpu_exception  : yes
cpuid level : 13
wp   : yes
flags    : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs :
bogomips : 6784.18
taille clflush : 64
alignement du cache : 64
tailles d'adresse : 39 bits physiques, 48 bits virtuels
gestion de l'énergie:

J'ai essayé1000 requêtes concurrentes, mais ApacheBench a expiré en raison de nombreux sockets ouverts. J'ai essayé httperf, voici les résultats :

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --taux=1000
httperf --client=0/1 --serveur=localhost --port=3000 --uri=/ --taux=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-appels=1
httperf: avertissement: limite de fichier ouvert > FD_SETSIZE; limitation du nombre maximal de fichiers ouverts à FD_SETSIZE
Longueur maximale d'explosion de connexion: 262
Total: connexions 9779 requêtes 9779 réponses 9779 test-durée 10.029 s
Taux de connexion: 975.1 conn/s (1.0 ms/conn, <=1022 connexions concurrentes)
Temps de connexion [ms]: min 0.5 avg 337.9 max 7191.8 moyenne 79.5 stddev 848.1
Temps de connexion [ms]: connect 207.3
Durée de la connexion [réponses]/conn] : 1.000
Taux de requête : 975.1 req/s (1.0 ms/req)
Taille de la requête [B] : 62.0
Taux de réponse [réponses/s] : min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 échantillons)
Temps de réponse [ms] : réponse 129.5 transfert 1.1
Taille de réponse [B] : en-tête 89.0 content 8660.0 footer 2.0 (total 8751.0)
Statut de réponse : 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0
Temps CPU [s] : user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)
Erreurs : total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Erreurs : fd-unavail 221 addrunavail 0 ftab-full 0 other 0

Comme vous le voyez, il fonctionne toujours. Malgré la pression, certains connexions peuvent expirer. Je suis toujours en train d'étudier les causes de ce problème.

Voici le contenu complet de cet article sur l'apprentissage de Nexus.js. Si vous avez des questions, vous pouvez laisser des commentaires ci-dessous pour discuter. Merci de votre soutien au tutoriel d'extase.

Déclaration : le contenu de cet article est issu du réseau, propriété de ses auteurs respectifs, contribué et téléversé par les utilisateurs d'Internet de manière spontanée. Ce site ne détient pas de droits de propriété, n'a pas été édité par l'homme, et n'assume aucune responsabilité juridique connexe. Si vous trouvez du contenu présumé enfreignant les droits d'auteur, veuillez envoyer un email à : notice#oldtoolbag.com (veuillez remplacer # par @ lors de l'envoi d'un email pour signaler une violation, et fournissez des preuves pertinentes. Une fois vérifié, ce site supprimera immédiatement le contenu présumé enfreignant les droits d'auteur.)