今天是一篇关于node,socket,webrtc的内容,先复习一下node stream的基本知识
stream 经典案例
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182const Readable = require("stream").Readable;var Writable = require("stream").Writable;var Transform = require("stream").Transform;const fs = require("fs");const path = require("path");var util = require("util");function DatabaseWriteStream(options) {if (!(this instanceof DatabaseWriteStream))return new DatabaseWriteStream(options);if (!options) options = {};options.objectMode = true;Writable.call(this, options);}util.inherits(DatabaseWriteStream, Writable);DatabaseWriteStream.prototype._write = function write(doc, encoding, callback) {console.log("fsf");fs.writeFileSync(path.resolve(__dirname, "./test.txt"), doc, { flag: "a" });callback();};function JSONEncode(options) {if (!(this instanceof JSONEncode)) return new JSONEncode(options);if (!options) options = {};options.objectMode = true;Transform.call(this, options);}util.inherits(JSONEncode, Transform);JSONEncode.prototype._transform = function _transform(obj, encoding, callback) {try {obj = "temp" + "" + obj;} catch (err) {return callback(err);}this.push(obj);callback();};function Thermometer(options) {if (!(this instanceof Thermometer)) return new Thermometer(options);if (!options) options = {};console.log("www");options.objectMode = true;Readable.call(this, options);}util.inherits(Thermometer, Readable);const getTemperatureReadingFromThermometer = fn => {const temperature = (Math.random() * 39) | 0;fn(null, temperature);};let count = 0;Thermometer.prototype._read = function read() {var self = this;getTemperatureReadingFromThermometer(function(err, temperature) {if (err) self.emit("error", err);else {if (count >= 10) self.unpipe();count++;self.push(temperature);}});};var thermomether = Thermometer();var db = DatabaseWriteStream();const json = JSONEncode();thermomether.pipe(json).pipe(db);thermomether.on("data", function(temp) {console.log("temp:", temp);db.write(`${temp}\n`);thermomether.pause();setTimeout(function() {thermomether.resume();}, 2000);Ï});正式介绍web-rtc的实现
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879const socket = io("/");const videoGrid = document.getElementById("video-grid");const myPeer = new Peer(undefined, {host: "/",port: "3001"});// mypeer打开就要加入socketmyPeer.on("open", id => {socket.emit("join-room", ROOM_ID, id);});const myVideo = document.createElement("video");myVideo.muted = true; // 自己静音const peers = {};function addVideoStream(video, stream) {video.srcObject = stream;video.addEventListener("loadedmetadata", () => {video.play();});videoGrid.append(video);}navigator.mediaDevices.getUserMedia({video: true,audio: true}).then(stream => {// 一进来就加载自己的音视频addVideoStream(myVideo, stream);// 响应呼叫myPeer.on("call", call => {// 把自己的stream回传过去call.answer(stream);const video = document.createElement("video");call.on("stream", userVideoStream => {addVideoStream(video, userVideoStream);});});// 1 新用户进来socket.on("user-connected", userId => {connectToNewUser(userId, stream);});});socket.on("user-disconnected", userId => {if (peers[userId]) peers[userId].close();});Ïfunction connectToNewUser(userId, stream) {// 呼叫对应userId的peerconst call = myPeer.call(userId, stream);const video = document.createElement("video");// 接受对方发送过来的streamcall.on("stream", userVideoStream => {addVideoStream(video, userVideoStream);});// socket关掉后 video也需要移除call.on("close", () => {video.remove();});peers[userId] = call;}// 这是服务端的代码io.on('connection', socket => {socket.on('join-room', (roomId, userId) => {socket.join(roomId)socket.to(roomId).broadcast.emit('user-connected', userId)socket.on('disconnect', () => {socket.to(roomId).broadcast.emit('user-disconnected', userId)})})})
以上就是所有,之前讲过socket加入房间和广播消息,这里有用到