76 lines
2.6 KiB
Python
76 lines
2.6 KiB
Python
import logging
|
|
from typing import Dict, Optional, Any
|
|
|
|
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
|
|
OTLPLogExporter as GRPCExporter,
|
|
)
|
|
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
|
|
OTLPLogExporter as HTTPExporter,
|
|
)
|
|
from opentelemetry.sdk.resources import Resource
|
|
from opentelemetry.sdk._logs.export import LogExporter, BatchLogRecordProcessor
|
|
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
|
|
|
from opentelemetry.instrumentation.logging import LoggingInstrumentor
|
|
|
|
|
|
class LoggerWrapper(object):
|
|
resource_attributes: Dict[Any, Any] = {}
|
|
endpoint: Optional[str] = None
|
|
headers: Dict[str, str] = {}
|
|
__logging_exporter: Optional[LogExporter] = None
|
|
__logging_provider: Optional[LoggerProvider] = None
|
|
|
|
def __new__(cls, exporter: Optional[LogExporter] = None) -> "LoggerWrapper":
|
|
if not hasattr(cls, "instance"):
|
|
obj = cls.instance = super(LoggerWrapper, cls).__new__(cls)
|
|
if not LoggerWrapper.endpoint:
|
|
return obj
|
|
|
|
obj.__logging_exporter = (
|
|
exporter
|
|
if exporter
|
|
else init_logging_exporter(LoggerWrapper.endpoint, LoggerWrapper.headers)
|
|
)
|
|
obj.__logging_provider = init_logging_provider(
|
|
obj.__logging_exporter, LoggerWrapper.resource_attributes
|
|
)
|
|
LoggingInstrumentor().instrument(set_logging_format=True)
|
|
|
|
return cls.instance
|
|
|
|
@staticmethod
|
|
def set_static_params(
|
|
resource_attributes: dict,
|
|
endpoint: str,
|
|
headers: Dict[str, str],
|
|
) -> None:
|
|
LoggerWrapper.resource_attributes = resource_attributes
|
|
LoggerWrapper.endpoint = endpoint
|
|
LoggerWrapper.headers = headers
|
|
|
|
|
|
def init_logging_exporter(endpoint: str, headers: Dict[str, str]) -> LogExporter:
|
|
if "http" in endpoint.lower() or "https" in endpoint.lower():
|
|
return HTTPExporter(endpoint=f"{endpoint}/v1/logs", headers=headers)
|
|
else:
|
|
return GRPCExporter(endpoint=endpoint, headers=headers)
|
|
|
|
|
|
def init_logging_provider(
|
|
exporter: LogExporter, resource_attributes: Optional[Dict[Any, Any]] = None
|
|
) -> LoggerProvider:
|
|
resource = (
|
|
Resource.create(resource_attributes)
|
|
if resource_attributes
|
|
else Resource.create()
|
|
)
|
|
|
|
logger_provider = LoggerProvider(resource=resource)
|
|
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
|
|
|
|
logging_handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)
|
|
logging.basicConfig(level=logging.INFO, handlers=[logging_handler])
|
|
|
|
return logger_provider
|