分布式各系统时间统一程序

这篇具有很好参考价值的文章主要介绍了分布式各系统时间统一程序。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


1、背景

使用场景是在一个大型分布式系统下,对时间有一个较高的水平要求。因为需要矫正每台运行服务的主机时间。

2、Cristian’s algorithm 算法(克里斯蒂安算法)

Cristian's algorithm 算法是一种时钟同步算法, 用于通过客户端进程与时间服务器同步时间。此算法适用于低延迟网络, 其中往返时间与准确性相比, 它是短的, 而易于冗余的分布式系统/应用程序与该算法不能并存。这里的往返时间是指请求开始到相应响应结束之间的持续时间。
分布式各系统时间统一程序

3、实现思路

令T_0 为客户端获取时钟时间请求的本地时间。
令T_1 为客户端收到服务端回执的包的本地时间。

3.1、步骤:

1、首先客户端发送时钟时间请求包;
2、服务端收到立即给该客户端回执一个服务器本地时间的包;
3、客户端收到服务器发来的回执包,进行误差计算。

3.2、公式

公式:
分布式各系统时间统一程序

其中:
T_0 为客户端获取时钟时间请求的本地时间。
T_1 为客户端收到服务端回执的包的本地时间。
T_server为服务器返回的时钟时间
T_client 同步时钟时间
T_1 - T_0指网络和服务器将请求传输到服务器, 处理请求并将响应返回给客户端进程所花费的总时间;这里假设网络延迟T_0 和T_1大致相等。

客户端的时间最多与服务端时间差(T_1 - T_0)/2。

因此,我们以t作为最大容忍误差。

4、具体代码

4.1、构建时间戳

既然传输的包中需要有本地时间。所以我们需要构建一个适合自己的时间戳,当然你也可以用C++标准库自带的。

struct DateTime
{
	int year;
	int month;
	int day;
	int hour;
	int min;
	int sec;
	int milliSec;
};

为了后续对时间进行处理运算,这里提供了一些便捷的方法。

例如 自定义时间戳转QDatetime、自定义时间戳转SYSTEMTIME等等

	QDateTime DateTime::ToQDateTime() const
	{
		QString str = QString("%1-%2-%3 %4:%5:%6 %7")
			.arg(year).arg(month).arg(day).arg(hour).arg(min).arg(sec).arg(milliSec);
		QDateTime time = QDateTime::fromString(str, "yyyy-M-d h:m:s z");
	
		return time;
	}
	SYSTEMTIME DateTime::ToSystmTime() const
	{
		SYSTEMTIME time;
		time.wYear = year;
		time.wMonth = month;
		time.wDay = day;
		time.wHour = hour;
		time.wMinute = min;
		time.wSecond = sec;
		time.wMilliseconds = milliSec;

		return time;
	}

	void DateTimeFrom::QDateTime(const QDateTime& time)
	{
		year = time.date().year();
		month = time.date().month();
		day = time.date().day();
		hour = time.time().hour();
		min = time.time().minute();
		sec = time.time().second();
		milliSec = time.time().msec();
	}

	DateTime DateTime::operator -(const DateTime& other)
	{
		QDateTime t1 = this->ToQDateTime();
		QDateTime t2 = other.ToQDateTime();
		qint64 d1 = t1.toMSecsSinceEpoch();
		qint64 d2 = t2.toMSecsSinceEpoch();
		QDateTime interval = QDateTime::fromMSecsSinceEpoch(abs(t1.toMSecsSinceEpoch() - t2.toMSecsSinceEpoch()));
		DateTime time;


		time.FromQDateTime(interval);
		time.hour -= 8;
		return time;
	}

	DateTime DateTime::operator +(const DateTime& other)
	{
		QDateTime t1 = this->ToQDateTime();
		QDateTime t2 = other.ToQDateTime();
		QDateTime interval = QDateTime::fromMSecsSinceEpoch(t1.toMSecsSinceEpoch() + t2.toMSecsSinceEpoch());
		DateTime time;
		time.FromQDateTime(interval);
		return time;
	}

	DateTime& DateTime::operator +=(const DateTime& other)
	{
		QDateTime t1 = this->ToQDateTime();
		QDateTime t2 = other.ToQDateTime();

		QDateTime interval = QDateTime::fromMSecsSinceEpoch(t1.toMSecsSinceEpoch() + t2.toMSecsSinceEpoch());
		FromQDateTime(interval);

		return *this;
	}
};

4.2、定义数据包

由于我们只需要一个时间戳并且我们需要标识一下该包是请求还是回执包,以及自身的IP信息。所以这个包将很简单。state的范围:1-请求 2-回执

struct ClockPag
{
	char client[40]{ 0 };
	int state = 0; 
	DateTime time;
};

4.3、客户端实现

头文件 ClockClient.h

#pragma once

#include "DateTime.h"

#include <QUdpsocket>
#include <QNetworkDatagram>
#include <QTimer>
extern int g_step;
constexpr int gc_port = 10023;
constexpr int gc_recvPort = 10024;

class ClockClient : public QObject
{
	Q_OBJECT
public:
	ClockClient();

	void Start(int step = g_step * 1000);
	void Stop();
private:
	bool CorrectionTime();
	void SendLocaltime();
	QString GetLocalIp();
	bool InitNetWork();
public:
	DWORD AppOfSelfCheck(LPCTSTR name);
	HANDLE GetProcessHandleByID(DWORD nID);
	std::vector<DWORD> GetProcessIDByname(const char * name);
private slots:
	void onTimerServer();

private:
	QString m_localIp;
	qint32 m_localPort;

	ClockPag m_clockPag;

	QUdpSocket m_send;
	QUdpSocket m_rece;

	DateTime m_oldTime;
	DateTime m_currTime;
	DateTime m_serviceTime;
	QTimer m_timer;
	QList<QHostAddress> ipAddressesList;
};

源文件 ClockClient.cpp

#include "ClockClient.h"

#include <iostream>
#include <minwindef.h>
#include <QList>
#include <QNetworkInterface>
extern int g_step;
ClockClient::ClockClient()
	: m_localIp{ GetLocalIp() }
	, m_localPort{ gc_port }
{
	bool ret = connect(&m_timer, &QTimer::timeout, this, &ClockClient::SendLocaltime);
	if (false == ret)
	{
		std::cout << "绑定失败!" << std::endl;
		exit(EXIT_FAILURE);
	}
	if (false == InitNetWork())
	{
		std::cout << "网络初始化失败!" << std::endl;
		exit(EXIT_FAILURE);
	}
}


bool ClockClient::InitNetWork()
{
	bool ret = m_rece.bind(gc_recvPort, QUdpSocket::ShareAddress | QUdpSocket::ReuseAddressHint);
	if (false == ret)
	{
		return false;
	}
	ret = connect(&m_rece, &QUdpSocket::readyRead, this, &ClockClient::onTimerServer);

	return ret;
}


void ClockClient::Start(int step)
{
	m_timer.start(step); // step:10s
}

void ClockClient::Stop()
{
	m_timer.stop();
}

void ClockClient::onTimerServer()
{
	while (true == m_rece.hasPendingDatagrams())
	{
		QByteArray ba = m_rece.receiveDatagram().data();

		char* buffer = new char[ba.size()];
		memcpy(buffer, ba.data(), ba.size());
		memset(m_clockPag.client, 0, sizeof(m_clockPag.client));
		m_clockPag = *reinterpret_cast<ClockPag*>(buffer);

		QString ip = m_rece.receiveDatagram().senderAddress().toString();
		//ip = ip.mid(7);
		if (m_localIp != QString(m_clockPag.client)
			|| ip == m_localIp || m_clockPag.state != 2)
		{
			delete[] buffer;
			buffer = nullptr;
			continue;
		}
		m_clockPag.state = 3;
		m_serviceTime = m_clockPag.time;//服务器时间
		m_currTime.FromQDateTime(QDateTime::currentDateTime()); // 本地时间
		delete[] buffer;
		buffer = nullptr;
		std::cout << "设置状态:" << CorrectionTime() << std::endl;
	}
}

bool ClockClient::CorrectionTime()
{
	DateTime rtt = m_currTime - m_oldTime;
	rtt.hour += 8;
	qint64 eee = rtt.ToQDateTime().toMSecsSinceEpoch();
	qint64 rtt2 = rtt.ToQDateTime().toMSecsSinceEpoch() / 2; // 平均延时
	std::cout << "校验周期(毫秒)" << rtt2 << std::endl;
	DateTime midTime;
	midTime.FromQDateTime(QDateTime::fromMSecsSinceEpoch(rtt2));

	DateTime localTime = m_serviceTime + midTime;

	//if(m_serviceTime.ToQDateTime().toMSecsSinceEpoch() >)

	if (localTime.ToQDateTime().toMSecsSinceEpoch() > m_currTime.ToQDateTime().toMSecsSinceEpoch())
	{
		DateTime tmp = localTime - m_currTime;
		tmp.hour += 8;
		std::cout << "本地误差(毫秒)" << tmp.ToQDateTime().toMSecsSinceEpoch() << std::endl;
	}
	else
	{
		DateTime tmp = m_currTime - localTime;
		tmp.hour += 8;
		std::cout << "本地误差(毫秒)" << tmp.ToQDateTime().toMSecsSinceEpoch() << std::endl;
	}
	SYSTEMTIME myTime = localTime.ToSystmTime();

	return SetLocalTime(&myTime);
}

void ClockClient::SendLocaltime()
{
	DateTime currTime{};
	currTime.FromQDateTime(QDateTime::currentDateTime());

	m_clockPag.time = currTime;
	memset(m_clockPag.client, 0, sizeof(m_clockPag.client));
	char* ptr = const_cast<char*>(m_localIp.toStdString().c_str());
	std::string str = m_localIp.toLocal8Bit().toStdString();
	m_clockPag.state = 1;
	for (int i = 0; i < m_localIp.size(); ++i)
	{
		m_clockPag.client[i] = m_localIp[i].toLatin1();
	}
	int len = m_send.writeDatagram(reinterpret_cast<char*>(&m_clockPag), sizeof(m_clockPag),
		QHostAddress::Broadcast, m_localPort);

	if (len != sizeof(m_clockPag))
	{
		return ;
	}
	m_oldTime = currTime;

	return ;
}

QString ClockClient::GetLocalIp()
{
	QString ipAddress;
	foreach(QNetworkInterface netInterface, QNetworkInterface::allInterfaces())
	{
		QList<QNetworkAddressEntry>  entryList = netInterface.addressEntries();

		foreach(QNetworkAddressEntry entry, entryList)
		{
			if (entry.ip().protocol() == QAbstractSocket::IPv4Protocol && entry.ip() != QHostAddress::LocalHost)
			{
				QString LocalIP = entry.ip().toString();
				
				if (LocalIP.startsWith("192.", Qt::CaseSensitive))
				{
					ipAddress = entry.ip().toString();
				}
			}
		}
	}
	return ipAddress;
}

主函数 main.cpp

#include "ClockClient.h"
#include <iostream>
#include <string>
#include <string.h>
#include "Windows.h"
#include <tchar.h>
#include <comdef.h>
#include "tlhelp32.h"
int g_step = 20;

int main(int argc, char* argv[])
{
	QCoreApplication a(argc, argv);
	ClockClient client;
	client.Start();

	return a.exec();
}

4.3、服务端实现

头文件 ClockServer.h

#pragma once

#include "DateTime.h"

#include <QUdpsocket>
#include <QNetworkDatagram>

constexpr int gc_port = 10023;
constexpr int gc_recvPort = 10024;
class ClockServer : public QObject
{
	Q_OBJECT
public:
	ClockServer();
	HANDLE GetProcessHandleByID(DWORD nID);
	std::vector<DWORD> GetProcessIDByname(const char * name);
	void AppOfSelfCheck(const char * name);
private:
	bool InitNetWork();
	void onCheckTime();
private:
	qint32 m_localPort;

	QUdpSocket m_send;
	QUdpSocket m_rece;
};

源文件 ClockServer.cpp

#include "ClockServer.h"

#include <iostream>

ClockServer::ClockServer()
	:m_localPort{gc_port}
{
	bool ret = InitNetWork();
	if (false == ret)
	{
		std::cout << "网络初始化失败" << std::endl;
		exit(EXIT_FAILURE);
	}
}

bool ClockServer::InitNetWork()
{
	bool ret = m_rece.bind(/*QHostAddress(m_localIp),*/ m_localPort);
	if (false == ret)return ret;
	ret = connect(&m_rece, &QUdpSocket::readyRead, this, &ClockServer::onCheckTime);
	m_rece.setSocketOption(QAbstractSocket::MulticastLoopbackOption, 0);
	return ret;
}

void ClockServer::onCheckTime()
{
	while (true == m_rece.hasPendingDatagrams())
	{
		QByteArray ba = m_rece.receiveDatagram().data();

		char* buffer = new char[ba.size()];
		if(nullptr == buffer)continue;
		memcpy(buffer, ba.data(), ba.size());

		ClockPag data;
		data = *reinterpret_cast<ClockPag*>(buffer);
		delete[] buffer;
		buffer = nullptr;
		if (data.state != 1)
		{
			continue;
		}
		int port = gc_port;
		DateTime currTime{};
		currTime.FromQDateTime(QDateTime::currentDateTime());
		data.state = 2;
		data.time = currTime;
		int len = m_send.writeDatagram(reinterpret_cast<char*>(&data),
			sizeof(data),
			QHostAddress::Broadcast, gc_recvPort);

		qDebug() << u8"收到 ip:" << QString(data.client) << u8"的时钟校验包";
	}
}

主程序mian.cpp

#include <QtCore/QCoreApplication>

#include "ClockServer.h"

int main(int argc, char* argv[])
{
	QCoreApplication a(argc, argv);
	ClockServer server;
	return a.exec();
}

说明

这里我删减了一些不重要代码,原本都是纯后台的。所以我懒得再改代码截效果了。请读者自便。有技术问题请联系我。文章来源地址https://www.toymoban.com/news/detail-491918.html

到了这里,关于分布式各系统时间统一程序的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用Docker构建分布式应用程序

    作者:禅与计算机程序设计艺术 Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的Linux或Windows机器上,也可以实现虚拟化。 传统的应用分为三层结构:应用运行环境、应用逻辑和资源管理,Doc

    2024年02月08日
    浏览(73)
  • 【前端版】分布式医疗云平台【登陆页面修改、页面 title 修改、登陆接口准备说明、把前端和后端统一使用 git 管理、启动前端 VUE 项目、用户登陆】(十七)

    目录 2.8.【前端】登陆页面修改 2.8.1.主页退出 2.8.2.登陆页面修改 2.9.【前端】页面 title 修改

    2024年02月07日
    浏览(44)
  • Gateway+Springsecurity+OAuth2.0+JWT 实现分布式统一认证授权!

    目录 1. OAuth2.0授权服务 2. 资源服务 3. Gateway网关 4. 测试   在SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心-CSDN博客 ​​​​​​ 基础上整合网关和JWT实现分布式统一认证授权。   大致流程如下: 1、客户端发出请求给网关获取令牌 2、网关收到请求,直接转发给授权服务

    2024年01月24日
    浏览(49)
  • 使用 Docker 部署分布式存储系统——Ceph

    最近工作中接触了一个 Python + Flask 的新项目,项目中使用了 Ceph 对象存储服务。遂在开发环境使用 Docker 搭建了一套 Ceph 集群。 Ceph 官方文档 Ceph 是一个开源的分布式存储系统,提供了对象存储、块存储和文件系统三种存储接口。Ceph 将数据存储在逻辑存储池中,使用 CRUSH 分

    2024年04月15日
    浏览(48)
  • 1.使用分布式文件系统Minio管理文件

    文件系统 文件系统 是操作系统用于组织管理 存储设备(磁盘)或分区 上文件信息的方法和数据结构,负责对文件存储设备空间进行组织和分配,并对存入文件进行保护和检索 文件系统是负责管理和存储文件的系统软件,操作系统通过文件系统提供的接口去存取文件,用户通过操

    2024年01月22日
    浏览(40)
  • 使用docker搭建minio分布式对象存储系统

    这里我简单的和大家介绍一下什么是minio ? 附上Minio官网链接:https://minio.org.cn/ MinIO是一种开源的对象存储服务器,通过使用标准的HTTP/REST API来访问和管理数据。它采用分布式架构,具有高性能、高可用性和可扩展性。MinIO可以帮助用户轻松管理和存储大量的非结构化数据,

    2024年02月12日
    浏览(52)
  • 使用Docker部署开源分布式任务调度系统DolphinScheduler

    🔥 博客主页 : 小羊失眠啦. 🎥 系列专栏 : 《C语言》 《数据结构》 《Linux》 《Cpolar》 ❤️ 感谢大家点赞👍收藏⭐评论✍️ 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 本篇教程和大家分享一下DolphinSc

    2024年02月05日
    浏览(60)
  • Hadoop的分布式文件存储系统HDFS组件的使用

    存储整个HDFS集群的元数据(metaData) —— 整个集群中存储的目录和文件的索引 管理整个HDFS集群 接收客户端的请求 负责节点的故障转移 存储数据,是以block块的形式进行数据的存放。 默认情况下block块的大小是128M。 blocksize大小的计算公式: 寻址时间:下载文件时找到文件

    2024年02月09日
    浏览(73)
  • 分布式电源接入对配电网的影响matlab程序(IEEE9节点系统算例)

    分布式电源接入对配电网的影响matlab程序(IEEE9节点系统算例) 摘 要:分布式电源的接入使得配电系统从放射状无源网络变为分布有中小型电源的有源网络。带来了使单向流动的电流方向具有了不确定性等等问题,使得配电系统的控制和管理变得更加复杂。但同时,分布式电

    2024年01月16日
    浏览(54)
  • 分布式项目14 使用dubbo进行系统之间的通信,不用jsonp

    使用jsonp技术,前端的ajax需要把方法的datatype写成jsonp,并且在controller类中返回值类型是jsonPObject,这个是特有的java的api,用于jsonp技术。 分布式项目可以使用dubbo框架。 第一步:导入dubbo依赖 第二步: 编辑服务provider,在公共模块创建dubbo接口 在jt-common中创建: 第三步:在

    2024年02月08日
    浏览(87)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包