rfc-timeProtocol-multicast/python/multicast-timeservice/timeservice.py

66 lines
1.9 KiB
Python
Raw Normal View History

2023-06-20 13:54:34 +00:00
#!/usr/bin/env python
from socket import AF_INET6, AF_INET
import socket
from socket import AddressFamily
import struct
# Bugfix for Python 3.6 for Windows ... missing IPPROTO_IPV6 constant
#if not hasattr(socket, 'IPPROTO_IPV6'):
# socket.IPPROTO_IPV6 = 41
multicast_address = {
AF_INET: ["224.0.1.37"],
AF_INET6: ["FF02:0:0:0:0:0:0:37"]
}
multicast_port = 37
def get_ips():
ipv4 = list()
ipv6 = list()
for item in socket.getaddrinfo(socket.gethostname(), None):
protocol, *_, (ip, *_) = item
if protocol == AddressFamily.AF_INET:
ipv4.append(ip)
elif protocol == AddressFamily.AF_INET6:
ipv6.append(ip)
return ipv4, ipv6
if __name__ == '__main__':
all_ipv4, all_ipv6 = get_ips()
print(all_ipv4)
print(all_ipv6)
addr_info = socket.getaddrinfo('127.0.0.1', None) # get all ip
for addr in addr_info:
family = addr[0]
local_address = addr[4][0]
print(local_address)
sock = socket.socket(family, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((local_address, multicast_port))
if family == AF_INET:
for multicast_group in multicast_address[family]:
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(multicast_group) + socket.inet_aton(local_address)
)
elif family == AF_INET6:
for multicast_group in multicast_address[family]:
ipv6mr_interface = struct.pack('i', addr[4][3])
ipv6_mreq = socket.inet_pton(socket.AF_INET6, multicast_group) + ipv6mr_interface
sock.setsockopt(
socket.IPPROTO_IPV6,
socket.IPV6_JOIN_GROUP,
ipv6_mreq
)
# _transport, _protocol = await loop.create_datagram_endpoint(
# lambda: protocol_factory(), sock=sock)