基於TCP協定的 海量使用者即時通訊系統(聊天室) 實驗詳細過程

2020-08-08 13:45:10

學習視訊來源嗶哩嗶哩,寫這篇部落格純粹是爲了複習

一、實驗目的

	1、練習熟悉C/S架構方法及規範
	2、瞭解熟悉基於golang的tcp協定網路程式設計的方法及步驟
	3、瞭解聯繫利用redis第三方外掛,實現在go語言中使用redis
	4.、練習物件導向程式設計
	5、練習掌握golang的goroutine和channel等的使用,加強go
	語言程式設計的熟練度

二 、模組及功能

1.utils

	(1)定義資訊傳遞物件Transfer
	(2)實現從conn讀訊息,寫訊息的方法

2.common

	(1)定義message結構體,伺服器與用戶端資訊傳遞協定
	(2)定義各種型別訊息的結構體,訊息型別存放在message的Type欄位,訊息數據以json格式存放在message的Data欄位
	(2)定義使用者結構體User
	(4)定義訊息型別常數

3.client端

	(1)實現註冊,登錄業務
	(2)實現線上使用者列表,羣發訊息,獲取所有歷史訊息
	(3)實現退出業務

4.server端

	(1)實現通過redis,建立redis連線池,插入使用者資訊,查詢使用者,修改使用者資訊
	(2)實現註冊,登錄的驗證,資訊儲存
	(3)實現簡訊羣發

三、模組實現

1.utils

1.定義用於c/s傳輸訊息的物件,transfer

type Transfer struct{
	Conn net.Conn
	Buf [8096]byte
}

2.給transfer系結從conn讀方法,ReadPkg
爲防止粘包,採用先讀4個位元組的長度,再根據長度,讀真正的數據

func (this *Transfer)ReadPkg()(mes message.Message, err error){
 
	fmt.Println("讀取用戶端發送的數據")
	//(1)先讀取的時長度,並判斷長度對不對
		n, err := this.Conn.Read(this.Buf[:4])
		if n != 4||err != nil{
			if err == io.EOF{
				return
			}else{
				fmt.Println("conn.Read failed ,err", err)
				return
			}
		}
		fmt.Println("讀到的buf :", this.Buf[:4])
		var pkglen uint32
		pkglen = binary.BigEndian.Uint32(this.Buf[:4])
		//(2)根據pkglen讀取mes
		n, err = this.Conn.Read(this.Buf[:pkglen])
		if n != int(pkglen) || err != nil{
			fmt.Println("mes read failed, err:",err)
			return
		}
		//將buf反序列化,特別注意,mes要加&
		err = json.Unmarshal(this.Buf[:pkglen], &mes)
		if err != nil{
			fmt.Println("json.Unmarshal(buf[:pkglen], mes) failed, err:",err)
			return
		}
		return
}

3.系結發訊息的方法,也需要先發訊息長度,再發訊息體。

func (this *Transfer)WritePkg(data []byte)(err error){
	//先發送一個長度
	var pkglen uint32
	pkglen = uint32(len(data))
	//PutUint32(buf[0:4], pkglen), 將uint32轉成byte切片
	binary.BigEndian.PutUint32(this.Buf[0:4], pkglen)
	//發送長度
	n, err := this.Conn.Write(this.Buf[:4])
	if n != 4||err!=nil{
		fmt.Println("conn.Write(buf[:4])failed err :",err)
		return
	}
	//發送data本身
	n, err = this.Conn.Write(data)
	if n != int(pkglen)||err!=nil{
		fmt.Println("conn.Write(data)failed err :",err)
		return
	}
	return
}

2.common

(1)定義message結構體,伺服器與用戶端資訊傳遞協定

//這個是真正要發送給伺服器的訊息
type Message struct{
	Type string `json:"type"`//訊息型別
	Data string `json:"data"`//
}

(2)定義各種型別訊息的結構體,訊息型別存放在message的Type欄位,訊息數據以json格式存放在message的Data欄位

type LoginMes struct{
	UserId int `json:"userid"`
	UserPwd string `json:"userpwd"`
	UserName string `json:"username"`
}

type LoginResMes struct{
	Code int `json:"code"`//返回狀態嗎500表示該使用者未註冊  200表示登陸成功
	UserIds []int		  //儲存使用者id的一個切片
	Error string `json:"error"`//返回錯誤資訊
}

type RegisterMes struct{
	//註冊
	User User`json:"user"`  //就是使用者結構體
}

type RegisterResMes struct{
	Code int `json:"code"`//返回狀態嗎400表示該使用者已經註冊  200表示註冊成功
	Error string `json:"error"`//返回錯誤資訊
}


//爲了配合伺服器端推播使用者狀態變化訊息
type NotifyUserStatusMes struct{
	UserId int `json:"userid"`
	Status int `json:"status"`//使用者的狀態
}

//增加一個SmsMes//發送的訊息
type SmsMes struct{
	Content string`json:"content"`
	User//匿名結構體,繼承
}

(3)定義使用者結構體User

//定義一個使用者的結構體
type User struct{
	//爲了序列化和反序列化成功,必須保證使用者資訊的json字串key和結構體的欄位對應的tag欄位一致
	UserId int`json:"userid"`
	UserPwd string`json:"userpwd"`
	UserName string`json:"username"`
	UserStatus int`json:"userstatus"`
}

(4)定義訊息型別常數

const (
	LoginMesType	=	"LoginMes"
	LoginResMesType	=	"LoginResMes"
	RegisterMesType =   "RegisterMes"
	RegisterResMesType = "RegisterResMes"
	NotifyUserStatusMesType = "NotifyUserStatusMes"
	SmsMesType = "SmsMes"
)

3.client端

1.註冊–register 步驟
(1)連線伺服器,conn, err := net.Dial(「tcp」, 「localhost:8889」)
並defer conn.close()
(2)輸入資訊

	fmt.Println("輸入使用者的id :")
	fmt.Scanf("%d\n",&userId)
	fmt.Println("輸入使用者的密碼 :")
	fmt.Scanf("%s\n",&userPwd)
	fmt.Println("輸入使用者的暱稱 :")
	fmt.Scanf("%s\n",&userName)

(3)範例化message,Type欄位是RegisterMesType,Data是序列化後的RegisterMes

var mes message.Message
	mes.Type = message.RegisterMesType
	var registerMes message.RegisterMes
	registerMes.User.UserId = userId
	registerMes.User.UserPwd = userPwd
	registerMes.User.UserName = userName
	data, err := json.Marshal(registerMes)
	if err != nil{
		fmt.Println("Register.json.Marshal(registerMes)failed, err :", err)
		return
	}
	mes.Data = string(data)

(3)序列號mes,建立transfer範例tf,呼叫writerPkg方法發送給伺服器

	data, err = json.Marshal(mes)
	if err != nil{
		fmt.Println("Register.jjson.Marshal(mes)failed, err :", err)
		return
	}
	tf := &utils.Transfer{
		Conn : conn,
	}
	err = tf.WritePkg(data)
	if err != nil {
		fmt.Println("Register.WritePkg(data) failed, err :", err)
		return
	}

(4)呼叫tf.ReadPkg方法,得到mes,mes的Data反序列後存入新建立的 registerResMes中,驗證伺服器返回的處理結果

mes, err = tf.ReadPkg() // 
	if err != nil{
		fmt.Println("Register.readPkg(conn) failed",err)
		return
	}
	var registerResMes message.RegisterResMes
	err = json.Unmarshal([]byte(mes.Data), &registerResMes)
	if err != nil{
		fmt.Println("json.Unmarshal([]byte(mes.Data), &registerResMes)err,", err)
		return
	}
	if registerResMes.Code == 200{
		fmt.Println("註冊成功,可以重新登陸")
	}else{
		fmt.Println(registerResMes.Error)
	}

2.登錄—land步驟同register相同,只是mes的型別和數據是LoginMesType,LoginMes型別,不做解釋
登錄成功後,表示已經進入聊天室了,這時要開一個協程,與伺服器保持通訊,

go serverProcessMes(conn)

serverProcessMes函數,建立一個transfer範例tf,回圈接受伺服器發來的msg,根據msg的Type,執行相應的業務,如,其他使用者上線,使用者下線,其他使用者發訊息。

func serverProcessMes(conn net.Conn){
	//建立一個transfer範例,不停的讀伺服器發送的訊息
	tf := &utils.Transfer{
		Conn : conn,
	}
	for {
		//用戶端不停的讀取
		fmt.Println("用戶端正在等待讀取伺服器發送的訊息")
		mes, err := tf.ReadPkg()
		if err != nil{
			fmt.Println("tf.ReadPkg()failed, err :", err)
			return
		}
		//如果讀取到訊息,又是下一步的處理邏輯
		//fmt.Println(mes)
		switch mes.Type{
		case message.NotifyUserStatusMesType:
			//處理
			var notifyUserStatusMes *message.NotifyUserStatusMes
			json.Unmarshal([]byte(mes.Data),&notifyUserStatusMes)
			updataUserStatus(notifyUserStatusMes)
		case message.SmsMesType :
			outputGroupMes(&mes)
		default :
			fmt.Println("伺服器返回一個未知型別")
		}
	}
}

3.實現羣發訊息----SendGroupMsg 步驟:
(1)建立一個msg範例,Type是SmsMesType,Data是序列化後的SmsMes範例

	//1.建立一個message.Message
	var mes message.Message
	mes.Type = message.SmsMesType

	//2.建立一個SmsMes
	var smsMes message.SmsMes
	smsMes.Content = content
	smsMes.UserId = CurUser.UserId
	smsMes.UserStatus = CurUser.UserStatus

	//3.序列化
	data, err := json.Marshal(smsMes)
	if err != nil {
		fmt.Println("json.Marshal(smsMes) failed, err :", err)
		return
	}
	mes.Data = string(data)
	data, err = json.Marshal(mes)
	if err != nil {
		fmt.Println("json.Marshal(mes) failed, err :", err)
		return
	}

(2)建立tf,WritePkg

	tf := &utils.Transfer{
		Conn : CurUser.Conn,
	}
	err = tf.WritePkg(data)
	if err != nil{
		fmt.Println("tf.WritePkg(data) failed, err :", err)
		return
	}

4.server端

1.註冊和登錄都要連線redis,先初始化連線池,可以寫一個init()函數,這裏直接在主函數裡寫,redis的Pool有四個欄位,具體在程式碼中

	func initPool(address string, maxIdle, maxActive int, idleTimeout time.Duration){

	pool = &redis.Pool{
		MaxIdle : maxIdle,//最大空閒連線數
		MaxActive : maxActive, //表實和數據庫的最大連線數,0表示不限制
		IdleTimeout : idleTimeout,//最大空閒時間
		Dial: func()(redis.Conn, error){
		return redis.Dial("tcp", address)
		},
	}
}

2.我們用userdao把pool封裝起來,用到redis就從userdao中取一個conn,並在主函數中建立一個範例

type UserDao struct{
	pool *redis.Pool
}

//使用工廠模式,建立一個userdao範例
//連線池必須在程式開始時就建立好了
func NewUserDao(pool *redis.Pool)(userDao *UserDao){
	userDao = &UserDao{
		pool : pool,
	}
	return
}
func initUserDao(){
	model.MyUserDao = model.NewUserDao(pool)
}

3.這裏需要注意一個初始化的順序問題,先initPool,在initUserDao做好前兩個準備之後,寫伺服器監聽函數

	listen, err := net.Listen("tcp", "127.0.0.1:8889")
	defer listen.Close()
	if err !=nil{
		fmt.Println("net.Listen failed, err :", err)
		return
	}

4.回圈等待用戶端連線,並啓動協程與用戶端保持通訊

//等待連線
	for {
		fmt.Println("等待使用者連線伺服器")
		conn, err := listen.Accept()
		if err != nil{
			fmt.Println("listen.Accept() failed, err :", err)
			return
		}
		//一旦連線成功, 啓動一個協程與用戶端保持通訊
		go process(conn)
	}

5.第四步的process,初始化一個Processor,其中封裝了一個conn,有方法serverProcessMes,根據msg的Type欄位,處理登錄,註冊,羣發的方法。processor這是真正處理數據的介面,還有一個process2方法,從連線中得到msg,交給serverprocessmes處理,並通過err,檢測用戶端是否正常退出

func process(conn net.Conn){
	//延時關閉
	defer conn.Close()
	//回圈讀用戶端發送的資訊
	//呼叫總控
	processor := &Processor{
		Conn : conn,
	}
	err := processor.process2()
	if err != nil{
		fmt.Println("用戶端和伺服器端通訊協程錯誤, err :", err)
		return
	}
}
//編寫一個serverProcessMes 函數
//功能: 根據用戶端發送訊息種類的不同,決定呼叫那個函數來處理
func (this *Processor)serverProcessMes(mes *message.Message)(err error){
	switch mes.Type{
	case message.LoginMesType:
		//處理登陸
		//建立一個UserProcess範例
		up := &processes.UserProcess{
			Conn : this.Conn,
		}
		err = up.ServerProcessLogin(mes)
	case message.RegisterMesType:
		//處理註冊
		up := &processes.UserProcess{
			Conn : this.Conn,
		}
		err = up.ServerProcessRegister(mes)
	case message.SmsMesType:
		smsProcess := &processes.SmsProcess{}
		smsProcess.SendGroupMes(mes)
	default :
		fmt.Println("訊息型別不存在,無法處理。。。")
	}
	return
}
func (this *Processor)process2()(err error){
	for {
		//這裏封裝了readpack函數,用於接收數據包mes
		tf := &utils.Transfer{
			Conn : this.Conn,
		}
		mes, err := tf.ReadPkg()
		if err != nil{
			if err == io.EOF{
				fmt.Println("用戶端正常退出,我也退出")
				return err
			}else{
				fmt.Println("readPkg fail, err", err)
				return err
			}
		}
		fmt.Println("mes", mes)

		err = this.serverProcessMes(&mes)
		if err != nil{
			fmt.Println(err)
			return err
		}
	}
}

6.處理登錄業務
給 userdao系結方法Login,得到redis的連線,根據輸入資訊 呼叫getUserByID,入redis查詢使用者,驗證使用者密碼

func(this *UserDao)Login(userId int, userPwd string)(user *User, err error){
	//從userdao的連線池取出一個連線
	conn := this.pool.Get()
	defer conn.Close()
	user, err = this.getUserByID(conn, userId)
	if err != nil{
		return
	}
	//Id存在了,則密碼是否存在
	if user.UserPwd != userPwd{
		err = ERROR_USER_PWD
		return
	}
	return
}
func (this *UserDao)getUserByID(conn redis.Conn, id int)(user *User, err error){

	//通過給定id,去redis查詢這個使用者
	res, err := redis.String(conn.Do("hget", "user",id))
	if err != nil{
		//
		if err == redis.ErrNil{//表示在user雜湊中沒有找到對應id
			err = ERROR_USER_NOTEXISTS
		}

		return
	}
	user = &User{}
	//這裏需要把res反序列化成user範例
	err = json.Unmarshal([]byte(res), user)
	if err != nil{
		fmt.Println("json.Unmarshal failed, err:", err)
		return
	}
	return
}

7.處理註冊業務,與登錄的業務類似,先獲取redis連線,呼叫getuserbyid檢視是否註冊過,沒註冊過就寫入redis

func(this *UserDao)Register(user *message.User)(err error){
	//從userdao的連線池取出一個連線
	conn := this.pool.Get()
	defer conn.Close()
	_, err = this.getUserByID(conn, user.UserId)
	if err == nil{
		err = ERROR_USER_EXISTED
		return
	}
	//Id還沒有註冊過
	data, err := json.Marshal(user)
	if err != nil{
		return
	}
	//入庫
	_, err = conn.Do("hset", "user", user.UserId, string(data))
	if err != nil{
		fmt.Println("入庫錯誤 err :", err)
		return
	}
	return
}

8.處理羣發訊息的業務
建立結構體SmsProcess,實現方法,將msg序列化,然後遍歷所有線上使用者,呼叫SendMesToEachOnlineUser,給每個使用者發送訊息。SendMesToEachOnlineUser就是簡單的根據連線發送json

func (this *SmsProcess)SendGroupMes(mes *message.Message){

	var smsMes message.SmsMes
	err := json.Unmarshal([]byte(mes.Data), &smsMes)
	if err != nil{
		fmt.Println("json.Unmarshal([]byte(mes.Data), &smsMes)failed", err)
		return
	}
	//將mes重新序列化,發送
	data, err := json.Marshal(mes)
	if err != nil {
		fmt.Println("json.Marshal(mes) failed, err :", err)
		return
	}
	//遍歷伺服器端的onlineUSers的map
	//將訊息轉發出去
	for id, up := range userMgr.onlineUsers{
		if id == smsMes.UserId{
			continue
		}
		this.SendMesToEachOnlineUser(data, up.Conn)
	}
}
func (this *SmsProcess)SendMesToEachOnlineUser(data []byte, conn net.Conn){
	//建立一個transfer,發送data
	tf := &utils.Transfer{
		Conn : conn,
	}
	err := tf.WritePkg(data)
	if err != nil {
		fmt.Println("轉發訊息失敗",err)
		return
	}
}

四、測試

1.啓動用戶端,和伺服器
启动客户的短
在这里插入图片描述
2.測試註冊登錄
client1

在这里插入图片描述

server
在这里插入图片描述
client1
在这里插入图片描述
server在这里插入图片描述
再註冊登錄一個使用者:
在这里插入图片描述
發送訊息
在这里插入图片描述
在这里插入图片描述